Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers daplink_board.py Source File

daplink_board.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2009-2016, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 
00019 from __future__ import absolute_import
00020 
00021 import os
00022 import re
00023 import time
00024 import subprocess
00025 import sys
00026 import binascii
00027 import itertools
00028 import mbed_lstools
00029 import info
00030 import test_daplink
00031 from test_info import TestInfoStub, TestInfo
00032 from intelhex import IntelHex
00033 from pyocd.core.helpers import ConnectHelper
00034 
00035 FILE_IGNORE_PATTERN_LIST = [
00036     re.compile("\\._\\.Trashes")
00037 ]
00038 
00039 
00040 # This prevents the following error message from getting
00041 # displayed on windows if the mbed dismounts unexpectedly
00042 # during a transfer:
00043 #   There is no disk in the drive. Please insert a disk into
00044 #   drive \Device<Harddiskx><rdrive>
00045 def disable_popup():
00046     if sys.platform.startswith("win"):
00047         # pylint: disable=invalid-name
00048         import ctypes
00049         SEM_FAILCRITICALERRORS = 1
00050         GetErrorMode = \
00051             ctypes.windll.kernel32.GetErrorMode  # @UndefinedVariable
00052         GetErrorMode.restype = ctypes.c_uint
00053         GetErrorMode.argtypes = []
00054         SetErrorMode = \
00055             ctypes.windll.kernel32.SetErrorMode  # @UndefinedVariable
00056         SetErrorMode.restype = ctypes.c_uint
00057         SetErrorMode.argtypes = [ctypes.c_uint]
00058 
00059         err_mode = GetErrorMode()
00060         err_mode |= SEM_FAILCRITICALERRORS
00061         SetErrorMode(err_mode)
00062 
00063 disable_popup()
00064 
00065 
00066 def get_all_attached_daplink_boards():
00067     all_boards = []
00068     lstools = mbed_lstools.create()
00069     mbed_list = lstools.list_mbeds()
00070     for mbed in mbed_list:
00071         unique_id = mbed['target_id']
00072         board = DaplinkBoard(unique_id)
00073         if board._mode is not None: #Valid daplink should have set this mode
00074             all_boards.append(board)
00075         else:
00076             print("Warning: DAPLink tests cannot be done on board %s" % board.unique_id)
00077     return all_boards
00078 
00079 
00080 def _unique_id_to_host_id(unique_id):
00081     """Return the chip id unique to the daplink host procesor
00082 
00083     Unique ID has the following fomat
00084     Board ID - 4 bytes
00085     Version  - 4 bytes
00086     Host ID  - Everything else
00087     """
00088     return unique_id[8:8 + 32]
00089 
00090 
00091 def _get_board_endpoints(unique_id):
00092     """Return a tuple of unique_id, serial_port, mount_point"""
00093     lstools = mbed_lstools.create()
00094     mbed_list = lstools.list_mbeds()
00095 
00096     host_id = _unique_id_to_host_id(unique_id)
00097     for mbed in mbed_list:
00098         mbed_unique_id = mbed['target_id']
00099         mbed_serial_port = mbed['serial_port']
00100         mbed_mount_point = mbed['mount_point']
00101         mbed_host_id = _unique_id_to_host_id(mbed_unique_id)
00102         if mbed_host_id == host_id:
00103             return mbed_unique_id, mbed_serial_port, mbed_mount_point
00104     return None
00105 
00106 
00107 def _ranges(i):
00108     for _, b in itertools.groupby(enumerate(i), lambda x_y: x_y[1] - x_y[0]):
00109         b = list(b)
00110         yield b[0][1], b[-1][1]
00111 
00112 
00113 def _parse_kvp_file(file_path, parent_test=None):
00114     """Parse details.txt and return True if successful"""
00115     test_info = None
00116     kvp = {}
00117     if parent_test is not None:
00118         test_info = parent_test.create_subtest('parse_kvp_file')
00119     line_format = re.compile("^([a-zA-Z0-9 ]+): +(.+)$")
00120     if not os.path.isfile(file_path):
00121         return kvp
00122 
00123     with open(file_path, "r") as file_handle:
00124         for line in file_handle:
00125             if len(line) <= 0:
00126                 if test_info is not None:
00127                     test_info.failure("Empty line in %s" % file_path)
00128                 continue
00129 
00130             if line[0] == '#':
00131                 # The line is a comment
00132                 continue
00133 
00134             match = line_format.match(line)
00135             if match is None:
00136                 if test_info is not None:
00137                     test_info.failure("Invalid line: %s" % line)
00138                 continue
00139 
00140             key = match.group(1)
00141             key = key.lower().replace(" ", "_")
00142             value = match.group(2)
00143             value = value.lower()
00144             value = value.strip()
00145             if key in kvp:
00146                 if test_info is not None:
00147                     test_info.failure("Duplicate key %s" % key)
00148                 continue
00149             kvp[key] = value
00150     return kvp
00151 
00152 
00153 def _compute_crc(hex_file_path):
00154     # Read in hex file
00155     new_hex_file = IntelHex()
00156     new_hex_file.padding = 0xFF
00157     new_hex_file.fromfile(hex_file_path, format='hex')
00158 
00159     # Get the starting and ending address
00160     addresses = new_hex_file.addresses()
00161     addresses.sort()
00162     start_end_pairs = list(_ranges(addresses))
00163     regions = len(start_end_pairs)
00164     assert regions == 1, ("Error - only 1 region allowed in "
00165                           "hex file %i found." % regions)
00166     start, end = start_end_pairs[0]
00167 
00168     # Compute checksum over the range (don't include data at location of crc)
00169     size = end - start + 1
00170     crc_size = size - 4
00171     data = new_hex_file.tobinarray(start=start, size=crc_size)
00172     data_crc32 = binascii.crc32(data) & 0xFFFFFFFF
00173 
00174     # Grab the crc from the image
00175     embedded_crc32 = (((new_hex_file[end - 3] & 0xFF) << 0) |
00176                       ((new_hex_file[end - 2] & 0xFF) << 8) |
00177                       ((new_hex_file[end - 1] & 0xFF) << 16) |
00178                       ((new_hex_file[end - 0] & 0xFF) << 24))
00179     return data_crc32, embedded_crc32
00180 
00181 
00182 def _run_chkdsk(drive):
00183     args = ["chkdsk", drive]
00184     process = subprocess.Popen(args, stdin=subprocess.PIPE,
00185                                stdout=subprocess.PIPE,
00186                                stderr=subprocess.PIPE)
00187     process.communicate(input=bytearray('n\r\n',encoding='ascii'))  # Answer no if prompted
00188     process.wait()
00189     return process.returncode
00190 
00191 
00192 class AssertInfo(object):
00193 
00194     def __init__(self, file_name, line_number):
00195         self._file = file_name
00196         self._line = line_number
00197 
00198     @property
00199     def file(self):
00200         return self._file
00201 
00202     @property
00203     def line(self):
00204         return self._line
00205 
00206 
00207 class DaplinkBoard(object):
00208 
00209     MODE_IF = "interface"
00210     MODE_BL = "bootloader"
00211 
00212     # Keys for details.txt
00213     KEY_UNIQUE_ID = "unique_id"
00214     KEY_HIC_ID = "hic_id"
00215     KEY_MODE = "daplink_mode"
00216     KEY_BL_VERSION = "bootloader_version"
00217     KEY_IF_VERSION = "interface_version"
00218     KEY_GIT_SHA = "git_sha"
00219     KEY_LOCAL_MODS = "local_mods"
00220     KEY_USB_INTERFACES = "usb_interfaces"
00221     KEY_BL_CRC = "bootloader_crc"
00222     KEY_IF_CRC = "interface_crc"
00223     KEY_REMOUNT_COUNT = "remount_count"
00224 
00225     def __init__(self, unique_id):
00226 
00227         self.unique_id = unique_id
00228         self.details_txt = None
00229         self._mode = None
00230         self._remount_count = None
00231         self._assert = None
00232         self._check_fs_on_remount = False
00233         self._manage_assert = False
00234         self.update_board_info()
00235 
00236     def __str__(self):
00237         return "Name=%s Unique ID=%s" % (self.name, self.get_unique_id())
00238 
00239     def get_unique_id(self):
00240         return self.unique_id
00241 
00242     def get_board_id(self):
00243         return self.board_id
00244 
00245     @property
00246     def hic_id(self):
00247         return self._hic_id
00248 
00249     @property
00250     def name(self):
00251         if self.board_id in info.BOARD_ID_TO_BUILD_TARGET:
00252             board_target = info.BOARD_ID_TO_BUILD_TARGET[self.board_id]
00253         else:
00254             board_target = "Unknown"
00255         return board_target
00256 
00257     def get_serial_port(self):
00258         return self.serial_port
00259 
00260     def get_mount_point(self):
00261         return self.mount_point
00262 
00263     def get_connected(self):
00264         """Check if the board is connected"""
00265         return os.path.isdir(self.mount_point)
00266 
00267     def get_failure_message_and_type(self):
00268         """Get the failure message and types from fail.txt
00269 
00270         return None if there there is no failure
00271         """
00272         error = None
00273         error_type = None
00274         fail_file = self.get_file_path('FAIL.TXT')
00275         if not self.get_connected():
00276             raise Exception('Board not connected')
00277         if os.path.isfile(fail_file):
00278             with open(fail_file, 'r') as fail_file_handle:
00279                 msg = fail_file_handle.read()
00280                 lines = msg.splitlines()
00281                 if len(lines) == 2:
00282                     if lines[0].startswith('error: '):
00283                         error = lines[0][7:]
00284                     else:
00285                         raise Exception('Can not parse error line in FAIL.TXT')
00286                     if lines[1].startswith('type: '):
00287                         error_type = lines[1][6:]
00288                     else:
00289                         raise Exception('Can not parse type line in FAIL.TXT')
00290                 else:
00291                     raise Exception('Wrong number of lines in FAIL.TXT, expected: 2')
00292         return error, error_type
00293 
00294     def get_assert_info(self):
00295         """Return an AssertInfo if an assert occurred, else None"""
00296         return self._assert
00297 
00298     def get_mode(self):
00299         """Return either MODE_IF or MODE_BL"""
00300         assert ((self._mode is DaplinkBoard.MODE_BL) or
00301                 (self._mode is DaplinkBoard.MODE_IF))
00302         return self._mode
00303 
00304     def get_file_path(self, file_name):
00305         """Convenience function to the path to a file on the drive"""
00306         return os.path.normpath(self.mount_point + os.sep + file_name)
00307 
00308     def refresh(self, parent_test):
00309         """Remount driver to get updated contents"""
00310         refresh_filename = self.get_file_path('REFRESH.ACT')
00311         with open(refresh_filename, 'wb') as _:
00312             pass
00313         self.wait_for_remount(parent_test)
00314 
00315     def set_mode(self, mode, parent_test=None):
00316         """Set the mode to either MODE_IF or MODE_BL"""
00317         assert ((mode is DaplinkBoard.MODE_BL) or
00318                 (mode is DaplinkBoard.MODE_IF))
00319         if parent_test is None:
00320             parent_test = TestInfoStub()
00321         test_info = parent_test.create_subtest('set_mode')
00322         current_mode = self.get_mode()
00323         if current_mode is mode:
00324             # No mode change needed
00325             return
00326 
00327         if mode is self.MODE_BL:
00328             test_info.info("changing mode IF -> BL")
00329             # Create file to enter BL mode
00330             start_bl_path = self.get_file_path('START_BL.ACT')
00331             with open(start_bl_path, 'wb') as _: pass
00332         elif mode is self.MODE_IF:
00333             test_info.info("changing mode BL -> IF")
00334             # Create file to enter IF mode
00335             start_if_path = self.get_file_path('START_IF.ACT')
00336             with open(start_if_path, 'wb') as _: pass
00337         else:
00338             test_info.warning("Board is in unknown mode")
00339         self.wait_for_remount(test_info)
00340 
00341         new_mode = self.get_mode()
00342         if new_mode != mode:
00343             test_info.failure("Board in wrong mode: %s" % new_mode)
00344             raise Exception("Could not change board mode")
00345 
00346     def set_check_fs_on_remount(self, enabled):
00347         assert isinstance(enabled, bool)
00348         self._check_fs_on_remount = enabled
00349         self.set_assert_auto_manage(enabled)
00350 
00351     def set_assert_auto_manage(self, enabled):
00352         assert isinstance(enabled, bool)
00353         self.clear_assert()
00354         self._manage_assert = enabled
00355 
00356     def clear_assert(self):
00357         assert_path = self.get_file_path("ASSERT.TXT")
00358         if os.path.isfile(assert_path):
00359             os.remove(assert_path)
00360             self.wait_for_remount(TestInfoStub())
00361 
00362     def run_board_test(self, parent_test):
00363         test_daplink.daplink_test(self, parent_test)
00364 
00365     def read_target_memory(self, addr, size, resume=True):
00366         assert self.get_mode() == self.MODE_IF
00367         with ConnectHelper.session_with_chosen_probe(unique_id=self.get_unique_id(),
00368                                                      resume_on_disconnect=resume) as session:
00369             data = session.target.read_memory_block8(addr, size)
00370         return bytearray(data)
00371 
00372     def test_fs(self, parent_test):
00373         """Check if the raw filesystem is valid"""
00374         if sys.platform.startswith("win"):
00375             test_info = parent_test.create_subtest('test_fs')
00376             returncode = _run_chkdsk(self.mount_point)
00377             test_info.info('chkdsk returned %s' % returncode)
00378             if returncode != 0:
00379                 test_info.failure('Disk corrupt')
00380 
00381             # Windows 8/10 workaround - rerun chkdsk until disk caching is on
00382             # Notes about this problem:
00383             # - This is less likely to occur when the "storage" service is
00384             #     turned off and/or you are running as administrator
00385             # - When creating a directory with os.mkdir the
00386             #     following error occurs: "WindowsError: [Error 1392] The
00387             #     file or directory is corrupted and unreadable: '<directory>'"
00388             # - When creating a file with open(<filename>, "wb") the
00389             #     following error occurs: "OError: [Errno 22] invalid
00390             #     mode ('wb') or filename: '<filename>'"
00391             # - When a file or directory is created on the drive in explorer
00392             #     and you preform a refresh, the newly created file or
00393             #     directory disappears
00394             persist_test_dir = self.get_file_path("persist_test_dir")
00395             for _ in range(10):
00396                 try:
00397                     os.mkdir(persist_test_dir)
00398                 except EnvironmentError as exception:
00399                     test_info.info("cache check exception %s" % exception)
00400                 if os.path.exists(persist_test_dir):
00401                     os.rmdir(persist_test_dir)
00402                     break
00403                 test_info.info("running checkdisk to re-enable caching")
00404                 _run_chkdsk(self.mount_point)
00405             else:
00406                 raise Exception("Unable to re-enable caching")
00407 
00408         # TODO - as a future improvement add linux and mac support
00409 
00410     # Tests for the following:
00411     # 1. Correct files present                -TODO
00412     # 2. Contents of file are valid ascii
00413     # 3. Each line ends with \r\n
00414     # 4. There is no whitespace at the end of the line
00415     # 5. Each file ends with \r\n
00416     def test_fs_contents(self, parent_test):
00417         """Check if the file contents are valid"""
00418         test_info = parent_test.create_subtest('test_fs_contents')
00419         non_ascii = b'[^\x20-\x7F\r\n]'
00420         non_cr_lf = b'\r[^\n]|[^\r]\n'
00421         trail_white = b'(?:\ \r|\ \n)'
00422         end_of_file = b'\r\n$'
00423         files = os.listdir(self.mount_point)
00424         non_ascii_re = re.compile(non_ascii)
00425         non_cr_lf_re = re.compile(non_cr_lf)
00426         trail_white_re = re.compile(trail_white)
00427         end_of_file_re = re.compile(end_of_file)
00428         for filename in files:
00429             filepath = self.get_file_path(filename)
00430             if not os.path.isfile(filepath):
00431                 test_info.info("Skipping non file item %s" % filepath)
00432                 continue
00433             skip = False
00434             for pattern in FILE_IGNORE_PATTERN_LIST:
00435                 if pattern.match(filename):
00436                     skip = True
00437                     break
00438             if skip:
00439                 continue
00440 
00441             with open(filepath, 'rb') as file_handle:
00442                 file_contents = file_handle.read()
00443             if non_ascii_re.search(file_contents):
00444                 test_info.failure("Non ascii characters in %s" % filepath)
00445             elif non_cr_lf_re.search(file_contents):
00446                 test_info.failure("File has non-standard line endings %s" %
00447                                   filepath)
00448             elif trail_white_re.search(file_contents):
00449                 test_info.warning("File trailing whitespace %s" %
00450                                   filepath)
00451             elif end_of_file_re.search(file_contents) is None:
00452                 test_info.warning("No newline at end of file %s" %
00453                                   filepath)
00454             else:
00455                 test_info.info("File %s valid" % filepath)
00456 
00457         self.test_details_txt(test_info)
00458 
00459     def load_interface(self, filepath, parent_test):
00460         """Load an interface binary or hex"""
00461         assert isinstance(filepath, str), "Invalid bootloader image!"
00462         assert isinstance(parent_test, TestInfo), "Invalid parent test object!"
00463 
00464         test_info = parent_test.create_subtest('load_interface')
00465         self.set_mode(self.MODE_BL, test_info)
00466 
00467         data_crc, crc_in_image = _compute_crc(filepath)
00468         assert data_crc == crc_in_image, ("CRC in interface is wrong "
00469                                           "expected 0x%x, found 0x%x" %
00470                                           (data_crc, crc_in_image))
00471 
00472         filename = os.path.basename(filepath)
00473         with open(filepath, 'rb') as firmware_file:
00474             data = firmware_file.read()
00475         out_file = self.get_file_path(filename)
00476         start = time.time()
00477         with open(out_file, 'wb') as firmware_file:
00478             firmware_file.write(data)
00479         stop = time.time()
00480         test_info.info("programming took %s s" % (stop - start))
00481         self.wait_for_remount(test_info)
00482 
00483         # Check the CRC
00484         self.set_mode(self.MODE_IF, test_info)
00485         if DaplinkBoard.KEY_IF_CRC not in self.details_txt:
00486             test_info.failure("No interface CRC in details.txt")
00487             return
00488         details_crc = int(self.details_txt[DaplinkBoard.KEY_IF_CRC], 0)
00489         test_info.info("Interface crc: 0x%x" % details_crc)
00490         if data_crc != details_crc:
00491             test_info.failure("Interface CRC is wrong")
00492 
00493     def load_bootloader(self, filepath, parent_test):
00494         """Load a bootloader binary or hex"""
00495         assert isinstance(filepath, str), "Invalid bootloader image!"
00496         assert isinstance(parent_test, TestInfo), "Invalid parent test object!"
00497 
00498         test_info = parent_test.create_subtest('load_bootloader')
00499         self.set_mode(self.MODE_IF, test_info)
00500 
00501         # Check image CRC
00502         data_crc, crc_in_image = _compute_crc(filepath)
00503         assert data_crc == crc_in_image, ("CRC in bootloader is wrong "
00504                                           "expected 0x%x, found 0x%x" %
00505                                           (data_crc, crc_in_image))
00506 
00507         filename = os.path.basename(filepath)
00508         with open(filepath, 'rb') as firmware_file:
00509             data = firmware_file.read()
00510         out_file = self.get_file_path(filename)
00511         start = time.time()
00512         with open(out_file, 'wb') as firmware_file:
00513             firmware_file.write(data)
00514         stop = time.time()
00515         test_info.info("programming took %s s" % (stop - start))
00516         self.wait_for_remount(test_info)
00517 
00518         # Check the CRC
00519         self.set_mode(self.MODE_IF, test_info)
00520         if DaplinkBoard.KEY_BL_CRC not in self.details_txt:
00521             test_info.failure("No bootloader CRC in details.txt")
00522             return
00523         details_crc = int(self.details_txt[DaplinkBoard.KEY_BL_CRC], 0)
00524         test_info.info("Bootloader crc: 0x%x" % details_crc)
00525         if data_crc != details_crc:
00526             test_info.failure("Bootloader CRC is wrong")
00527 
00528     def wait_for_remount(self, parent_test, wait_time=600):
00529         mode = self._mode
00530         count = self._remount_count
00531         test_info = parent_test.create_subtest('wait_for_remount')
00532 
00533         elapsed = 0
00534         start = time.time()
00535         remounted = False
00536         while os.path.isdir(self.mount_point):
00537             if self.update_board_info(False): #check info if it is already mounted
00538                 if mode is not None and self._mode is not None and mode is not self._mode:
00539                     remounted = True
00540                     test_info.info("already remounted with change mode")
00541                     break
00542                 elif count is not None and self._remount_count is not None and count != self._remount_count:
00543                         remounted = True
00544                         test_info.info("already remounted with change mount count")
00545                         break
00546             if elapsed > wait_time:
00547                 raise Exception("Dismount timed out")
00548             time.sleep(0.1)
00549             elapsed += 0.2
00550         else:
00551             stop = time.time()
00552             test_info.info("unmount took %s s" % (stop - start))
00553         elapsed = 0
00554         start = time.time()
00555 
00556         while not remounted:
00557             if self.update_board_info(False):
00558                 if os.path.isdir(self.mount_point):
00559                     # Information returned by mbed-ls could be old.
00560                     # Only break from the loop if the second call to
00561                     # mbed-ls returns the same mount point.
00562                     tmp_mount = self.mount_point
00563                     if self.update_board_info(False):
00564                         if tmp_mount == self.mount_point:
00565                             break
00566             if elapsed > wait_time:
00567                 raise Exception("Mount timed out")
00568             time.sleep(0.1)
00569             elapsed += 0.1
00570         stop = time.time()
00571         test_info.info("mount took %s s" % (stop - start))
00572 
00573         if count is not None and self._remount_count is not None:
00574             expected_count = (0 if mode is not self._mode
00575                               else (count + 1) & 0xFFFFFFFF)
00576             if expected_count != self._remount_count:
00577                     test_info.failure('Expected remount count of %s got %s' %
00578                                       (expected_count, self._remount_count))
00579 
00580         # If enabled check the filesystem
00581         if self._check_fs_on_remount:
00582             self.test_fs(parent_test)
00583             self.test_fs_contents(parent_test)
00584             self.test_details_txt(parent_test)
00585             if self._manage_assert:
00586                 if self._assert is not None:
00587                     test_info.failure('Assert on line %s in file %s' %
00588                                       (self._assert.line, self._assert.file))
00589                 self.clear_assert()
00590 
00591     def update_board_info(self, exptn_on_fail=True):
00592         """Update board info
00593 
00594         Update all board information variables that could
00595         change when remounting or changing modes.
00596         Note - before this function is set self.unique_id
00597         must be set.
00598         """
00599 
00600         try:
00601             endpoints = _get_board_endpoints(self.unique_id)
00602             if endpoints is None:
00603                 if exptn_on_fail:
00604                     raise Exception("Could not update board info: %s" %
00605                                     self.unique_id)
00606                 return False
00607             self.unique_id, self.serial_port, self.mount_point = endpoints
00608             # Serial port can be missing
00609             if self.unique_id is None:
00610                 if exptn_on_fail:
00611                     raise Exception("Mount point is null")
00612                 return False
00613             if self.mount_point is None:
00614                 if exptn_on_fail:
00615                     raise Exception("Mount point is null")
00616                 return False
00617             self.board_id = int(self.unique_id[0:4], 16)
00618             self._hic_id = int(self.unique_id[-8:], 16)
00619 
00620             # Note - Some legacy boards might not have details.txt
00621             details_txt_path = self.get_file_path("details.txt")
00622             self.details_txt = _parse_kvp_file(details_txt_path)
00623             self._parse_assert_txt()
00624 
00625             self._remount_count = None
00626             if DaplinkBoard.KEY_REMOUNT_COUNT in self.details_txt:
00627                 self._remount_count = int(self.details_txt[DaplinkBoard.KEY_REMOUNT_COUNT])
00628             self._mode = None
00629             if DaplinkBoard.KEY_MODE in self.details_txt:
00630                 DETAILS_TO_MODE = {
00631                     "interface": DaplinkBoard.MODE_IF,
00632                     "bootloader": DaplinkBoard.MODE_BL,
00633                 }
00634                 mode_str = self.details_txt[DaplinkBoard.KEY_MODE]
00635                 self._mode = DETAILS_TO_MODE[mode_str]
00636             else:
00637                 #check for race condition here
00638                 return False
00639             return True
00640         except Exception as e:
00641             if exptn_on_fail:
00642                 raise e
00643             else:
00644                 return False
00645 
00646     def test_details_txt(self, parent_test):
00647         """Check that details.txt has all requied fields"""
00648         test_info = parent_test.create_subtest('test_details_txt')
00649         required_key_and_format = {
00650             DaplinkBoard.KEY_UNIQUE_ID: re.compile("^[a-f0-9]{48}$"),
00651             DaplinkBoard.KEY_HIC_ID: re.compile("^[a-f0-9]{8}$"),
00652             DaplinkBoard.KEY_GIT_SHA: re.compile("^[a-f0-9]{40}$"),
00653             DaplinkBoard.KEY_LOCAL_MODS: re.compile("^[01]{1}$"),
00654             DaplinkBoard.KEY_USB_INTERFACES: re.compile("^.+$"),
00655             DaplinkBoard.KEY_MODE: re.compile("(interface|bootloader)"),
00656         }
00657         optional_key_and_format = {
00658             DaplinkBoard.KEY_BL_VERSION: re.compile("^[0-9]{4}$"),
00659             DaplinkBoard.KEY_IF_VERSION: re.compile("^[0-9]{4}$"),
00660             DaplinkBoard.KEY_BL_CRC: re.compile("^0x[a-f0-9]{8}$"),
00661             DaplinkBoard.KEY_IF_CRC: re.compile("^0x[a-f0-9]{8}$"),
00662         }
00663         # 1. keys and values are alphanumeric
00664         # 2. no duplicate keys
00665         # 3. format is key : value
00666         # 4. required keys are present
00667         # 5. optional keys have the expected format
00668         details_txt_path = self.get_file_path("details.txt")
00669         details_txt = _parse_kvp_file(details_txt_path, test_info)
00670         if not details_txt:
00671             test_info.failure("Could not parse details.txt")
00672             return
00673 
00674         # Check for required keys
00675         for key in required_key_and_format:
00676             if key not in details_txt:
00677                 test_info.failure("Missing detail.txt entry: %s" % key)
00678                 continue
00679 
00680             value = details_txt[key]
00681             pattern = required_key_and_format[key]
00682             if pattern.match(value) is None:
00683                 test_info.failure("Bad format detail.txt %s: %s" %
00684                                   (key, value))
00685 
00686         # Check format of optional values
00687         for key in optional_key_and_format:
00688             if key not in details_txt:
00689                 continue
00690 
00691             value = details_txt[key]
00692             pattern = optional_key_and_format[key]
00693             if pattern.match(value) is None:
00694                 test_info.failure("Bad format detail.txt %s: %s" %
00695                                   (key, value))
00696 
00697         # Check details.txt contents
00698         details_unique_id = None
00699         details_hic_id = None
00700         if DaplinkBoard.KEY_UNIQUE_ID in details_txt:
00701             details_unique_id = details_txt[DaplinkBoard.KEY_UNIQUE_ID]
00702         if DaplinkBoard.KEY_HIC_ID in details_txt:
00703             details_hic_id = details_txt[DaplinkBoard.KEY_HIC_ID]
00704         if details_unique_id is not None:
00705             if details_unique_id != self.unique_id:
00706                 test_info.failure("Unique ID mismatch in details.txt "
00707                                   "details.txt=%s, usb=%s" %
00708                                   (details_unique_id, self.unique_id))
00709             if details_hic_id is not None:
00710                 usb_hic = details_unique_id[-8:]
00711                 if details_hic_id != usb_hic:
00712                     test_info.failure("HIC ID is not the last 8 "
00713                                       "digits of unique ID "
00714                                       "details.txt=%s, usb=%s" %
00715                                       (details_hic_id, usb_hic))
00716 
00717     def _parse_assert_txt(self):
00718         file_path = self.get_file_path("ASSERT.TXT")
00719         if not os.path.isfile(file_path):
00720             self._assert = None
00721             return
00722 
00723         assert_table = _parse_kvp_file(file_path)
00724         assert "file" in assert_table
00725         assert "line" in assert_table
00726 
00727         self._assert = AssertInfo(assert_table["file"], assert_table['line'])