Repostiory containing DAPLink source code with Reset Pin workaround for HANI_IOT board.

Upstream: https://github.com/ARMmbed/DAPLink

test/daplink_board.py

Committer:
Pawel Zarembski
Date:
2020-04-07
Revision:
0:01f31e923fe2

File content as of revision 0:01f31e923fe2:

#
# DAPLink Interface Firmware
# Copyright (c) 2009-2016, ARM Limited, All Rights Reserved
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import os
import re
import time
import subprocess
import sys
import binascii
import itertools
import mbed_lstools
import info
import test_daplink
from test_info import TestInfoStub, TestInfo
from intelhex import IntelHex
from pyocd.core.helpers import ConnectHelper

FILE_IGNORE_PATTERN_LIST = [
    re.compile("\\._\\.Trashes")
]


# This prevents the following error message from getting
# displayed on windows if the mbed dismounts unexpectedly
# during a transfer:
#   There is no disk in the drive. Please insert a disk into
#   drive \Device\<Harddiskx>\<rdrive>
def disable_popup():
    if sys.platform.startswith("win"):
        # pylint: disable=invalid-name
        import ctypes
        SEM_FAILCRITICALERRORS = 1
        GetErrorMode = \
            ctypes.windll.kernel32.GetErrorMode  # @UndefinedVariable
        GetErrorMode.restype = ctypes.c_uint
        GetErrorMode.argtypes = []
        SetErrorMode = \
            ctypes.windll.kernel32.SetErrorMode  # @UndefinedVariable
        SetErrorMode.restype = ctypes.c_uint
        SetErrorMode.argtypes = [ctypes.c_uint]

        err_mode = GetErrorMode()
        err_mode |= SEM_FAILCRITICALERRORS
        SetErrorMode(err_mode)

disable_popup()


def get_all_attached_daplink_boards():
    all_boards = []
    lstools = mbed_lstools.create()
    mbed_list = lstools.list_mbeds()
    for mbed in mbed_list:
        unique_id = mbed['target_id']
        board = DaplinkBoard(unique_id)
        if board._mode is not None: #Valid daplink should have set this mode
            all_boards.append(board)
        else:
            print("Warning: DAPLink tests cannot be done on board %s" % board.unique_id)
    return all_boards


def _unique_id_to_host_id(unique_id):
    """Return the chip id unique to the daplink host procesor

    Unique ID has the following fomat
    Board ID - 4 bytes
    Version  - 4 bytes
    Host ID  - Everything else
    """
    return unique_id[8:8 + 32]


def _get_board_endpoints(unique_id):
    """Return a tuple of unique_id, serial_port, mount_point"""
    lstools = mbed_lstools.create()
    mbed_list = lstools.list_mbeds()

    host_id = _unique_id_to_host_id(unique_id)
    for mbed in mbed_list:
        mbed_unique_id = mbed['target_id']
        mbed_serial_port = mbed['serial_port']
        mbed_mount_point = mbed['mount_point']
        mbed_host_id = _unique_id_to_host_id(mbed_unique_id)
        if mbed_host_id == host_id:
            return mbed_unique_id, mbed_serial_port, mbed_mount_point
    return None


def _ranges(i):
    for _, b in itertools.groupby(enumerate(i), lambda x_y: x_y[1] - x_y[0]):
        b = list(b)
        yield b[0][1], b[-1][1]


def _parse_kvp_file(file_path, parent_test=None):
    """Parse details.txt and return True if successful"""
    test_info = None
    kvp = {}
    if parent_test is not None:
        test_info = parent_test.create_subtest('parse_kvp_file')
    line_format = re.compile("^([a-zA-Z0-9 ]+): +(.+)$")
    if not os.path.isfile(file_path):
        return kvp

    with open(file_path, "r") as file_handle:
        for line in file_handle:
            if len(line) <= 0:
                if test_info is not None:
                    test_info.failure("Empty line in %s" % file_path)
                continue

            if line[0] == '#':
                # The line is a comment
                continue

            match = line_format.match(line)
            if match is None:
                if test_info is not None:
                    test_info.failure("Invalid line: %s" % line)
                continue

            key = match.group(1)
            key = key.lower().replace(" ", "_")
            value = match.group(2)
            value = value.lower()
            value = value.strip()
            if key in kvp:
                if test_info is not None:
                    test_info.failure("Duplicate key %s" % key)
                continue
            kvp[key] = value
    return kvp


def _compute_crc(hex_file_path):
    # Read in hex file
    new_hex_file = IntelHex()
    new_hex_file.padding = 0xFF
    new_hex_file.fromfile(hex_file_path, format='hex')

    # Get the starting and ending address
    addresses = new_hex_file.addresses()
    addresses.sort()
    start_end_pairs = list(_ranges(addresses))
    regions = len(start_end_pairs)
    assert regions == 1, ("Error - only 1 region allowed in "
                          "hex file %i found." % regions)
    start, end = start_end_pairs[0]

    # Compute checksum over the range (don't include data at location of crc)
    size = end - start + 1
    crc_size = size - 4
    data = new_hex_file.tobinarray(start=start, size=crc_size)
    data_crc32 = binascii.crc32(data) & 0xFFFFFFFF

    # Grab the crc from the image
    embedded_crc32 = (((new_hex_file[end - 3] & 0xFF) << 0) |
                      ((new_hex_file[end - 2] & 0xFF) << 8) |
                      ((new_hex_file[end - 1] & 0xFF) << 16) |
                      ((new_hex_file[end - 0] & 0xFF) << 24))
    return data_crc32, embedded_crc32


def _run_chkdsk(drive):
    args = ["chkdsk", drive]
    process = subprocess.Popen(args, stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE)
    process.communicate(input=bytearray('n\r\n',encoding='ascii'))  # Answer no if prompted
    process.wait()
    return process.returncode


class AssertInfo(object):

    def __init__(self, file_name, line_number):
        self._file = file_name
        self._line = line_number

    @property
    def file(self):
        return self._file

    @property
    def line(self):
        return self._line


class DaplinkBoard(object):

    MODE_IF = "interface"
    MODE_BL = "bootloader"

    # Keys for details.txt
    KEY_UNIQUE_ID = "unique_id"
    KEY_HIC_ID = "hic_id"
    KEY_MODE = "daplink_mode"
    KEY_BL_VERSION = "bootloader_version"
    KEY_IF_VERSION = "interface_version"
    KEY_GIT_SHA = "git_sha"
    KEY_LOCAL_MODS = "local_mods"
    KEY_USB_INTERFACES = "usb_interfaces"
    KEY_BL_CRC = "bootloader_crc"
    KEY_IF_CRC = "interface_crc"
    KEY_REMOUNT_COUNT = "remount_count"

    def __init__(self, unique_id):

        self.unique_id = unique_id
        self.details_txt = None
        self._mode = None
        self._remount_count = None
        self._assert = None
        self._check_fs_on_remount = False
        self._manage_assert = False
        self.update_board_info()

    def __str__(self):
        return "Name=%s Unique ID=%s" % (self.name, self.get_unique_id())

    def get_unique_id(self):
        return self.unique_id

    def get_board_id(self):
        return self.board_id

    @property
    def hic_id(self):
        return self._hic_id

    @property
    def name(self):
        if self.board_id in info.BOARD_ID_TO_BUILD_TARGET:
            board_target = info.BOARD_ID_TO_BUILD_TARGET[self.board_id]
        else:
            board_target = "Unknown"
        return board_target

    def get_serial_port(self):
        return self.serial_port

    def get_mount_point(self):
        return self.mount_point

    def get_connected(self):
        """Check if the board is connected"""
        return os.path.isdir(self.mount_point)

    def get_failure_message_and_type(self):
        """Get the failure message and types from fail.txt

        return None if there there is no failure
        """
        error = None
        error_type = None
        fail_file = self.get_file_path('FAIL.TXT')
        if not self.get_connected():
            raise Exception('Board not connected')
        if os.path.isfile(fail_file):
            with open(fail_file, 'r') as fail_file_handle:
                msg = fail_file_handle.read()
                lines = msg.splitlines()
                if len(lines) == 2:
                    if lines[0].startswith('error: '):
                        error = lines[0][7:]
                    else:
                        raise Exception('Can not parse error line in FAIL.TXT')
                    if lines[1].startswith('type: '):
                        error_type = lines[1][6:]
                    else:
                        raise Exception('Can not parse type line in FAIL.TXT')
                else:
                    raise Exception('Wrong number of lines in FAIL.TXT, expected: 2')
        return error, error_type

    def get_assert_info(self):
        """Return an AssertInfo if an assert occurred, else None"""
        return self._assert

    def get_mode(self):
        """Return either MODE_IF or MODE_BL"""
        assert ((self._mode is DaplinkBoard.MODE_BL) or
                (self._mode is DaplinkBoard.MODE_IF))
        return self._mode

    def get_file_path(self, file_name):
        """Convenience function to the path to a file on the drive"""
        return os.path.normpath(self.mount_point + os.sep + file_name)

    def refresh(self, parent_test):
        """Remount driver to get updated contents"""
        refresh_filename = self.get_file_path('REFRESH.ACT')
        with open(refresh_filename, 'wb') as _:
            pass
        self.wait_for_remount(parent_test)

    def set_mode(self, mode, parent_test=None):
        """Set the mode to either MODE_IF or MODE_BL"""
        assert ((mode is DaplinkBoard.MODE_BL) or
                (mode is DaplinkBoard.MODE_IF))
        if parent_test is None:
            parent_test = TestInfoStub()
        test_info = parent_test.create_subtest('set_mode')
        current_mode = self.get_mode()
        if current_mode is mode:
            # No mode change needed
            return

        if mode is self.MODE_BL:
            test_info.info("changing mode IF -> BL")
            # Create file to enter BL mode
            start_bl_path = self.get_file_path('START_BL.ACT')
            with open(start_bl_path, 'wb') as _: pass
        elif mode is self.MODE_IF:
            test_info.info("changing mode BL -> IF")
            # Create file to enter IF mode
            start_if_path = self.get_file_path('START_IF.ACT')
            with open(start_if_path, 'wb') as _: pass
        else:
            test_info.warning("Board is in unknown mode")
        self.wait_for_remount(test_info)

        new_mode = self.get_mode()
        if new_mode != mode:
            test_info.failure("Board in wrong mode: %s" % new_mode)
            raise Exception("Could not change board mode")

    def set_check_fs_on_remount(self, enabled):
        assert isinstance(enabled, bool)
        self._check_fs_on_remount = enabled
        self.set_assert_auto_manage(enabled)

    def set_assert_auto_manage(self, enabled):
        assert isinstance(enabled, bool)
        self.clear_assert()
        self._manage_assert = enabled

    def clear_assert(self):
        assert_path = self.get_file_path("ASSERT.TXT")
        if os.path.isfile(assert_path):
            os.remove(assert_path)
            self.wait_for_remount(TestInfoStub())

    def run_board_test(self, parent_test):
        test_daplink.daplink_test(self, parent_test)

    def read_target_memory(self, addr, size, resume=True):
        assert self.get_mode() == self.MODE_IF
        with ConnectHelper.session_with_chosen_probe(unique_id=self.get_unique_id(),
                                                     resume_on_disconnect=resume) as session:
            data = session.target.read_memory_block8(addr, size)
        return bytearray(data)

    def test_fs(self, parent_test):
        """Check if the raw filesystem is valid"""
        if sys.platform.startswith("win"):
            test_info = parent_test.create_subtest('test_fs')
            returncode = _run_chkdsk(self.mount_point)
            test_info.info('chkdsk returned %s' % returncode)
            if returncode != 0:
                test_info.failure('Disk corrupt')

            # Windows 8/10 workaround - rerun chkdsk until disk caching is on
            # Notes about this problem:
            # - This is less likely to occur when the "storage" service is
            #     turned off and/or you are running as administrator
            # - When creating a directory with os.mkdir the
            #     following error occurs: "WindowsError: [Error 1392] The
            #     file or directory is corrupted and unreadable: '<directory>'"
            # - When creating a file with open(<filename>, "wb") the
            #     following error occurs: "OError: [Errno 22] invalid
            #     mode ('wb') or filename: '<filename>'"
            # - When a file or directory is created on the drive in explorer
            #     and you preform a refresh, the newly created file or
            #     directory disappears
            persist_test_dir = self.get_file_path("persist_test_dir")
            for _ in range(10):
                try:
                    os.mkdir(persist_test_dir)
                except EnvironmentError as exception:
                    test_info.info("cache check exception %s" % exception)
                if os.path.exists(persist_test_dir):
                    os.rmdir(persist_test_dir)
                    break
                test_info.info("running checkdisk to re-enable caching")
                _run_chkdsk(self.mount_point)
            else:
                raise Exception("Unable to re-enable caching")

        # TODO - as a future improvement add linux and mac support

    # Tests for the following:
    # 1. Correct files present                -TODO
    # 2. Contents of file are valid ascii
    # 3. Each line ends with \r\n
    # 4. There is no whitespace at the end of the line
    # 5. Each file ends with \r\n
    def test_fs_contents(self, parent_test):
        """Check if the file contents are valid"""
        test_info = parent_test.create_subtest('test_fs_contents')
        non_ascii = b'[^\x20-\x7F\r\n]'
        non_cr_lf = b'\r[^\n]|[^\r]\n'
        trail_white = b'(?:\ \r|\ \n)'
        end_of_file = b'\r\n$'
        files = os.listdir(self.mount_point)
        non_ascii_re = re.compile(non_ascii)
        non_cr_lf_re = re.compile(non_cr_lf)
        trail_white_re = re.compile(trail_white)
        end_of_file_re = re.compile(end_of_file)
        for filename in files:
            filepath = self.get_file_path(filename)
            if not os.path.isfile(filepath):
                test_info.info("Skipping non file item %s" % filepath)
                continue
            skip = False
            for pattern in FILE_IGNORE_PATTERN_LIST:
                if pattern.match(filename):
                    skip = True
                    break
            if skip:
                continue

            with open(filepath, 'rb') as file_handle:
                file_contents = file_handle.read()
            if non_ascii_re.search(file_contents):
                test_info.failure("Non ascii characters in %s" % filepath)
            elif non_cr_lf_re.search(file_contents):
                test_info.failure("File has non-standard line endings %s" %
                                  filepath)
            elif trail_white_re.search(file_contents):
                test_info.warning("File trailing whitespace %s" %
                                  filepath)
            elif end_of_file_re.search(file_contents) is None:
                test_info.warning("No newline at end of file %s" %
                                  filepath)
            else:
                test_info.info("File %s valid" % filepath)

        self.test_details_txt(test_info)

    def load_interface(self, filepath, parent_test):
        """Load an interface binary or hex"""
        assert isinstance(filepath, str), "Invalid bootloader image!"
        assert isinstance(parent_test, TestInfo), "Invalid parent test object!"

        test_info = parent_test.create_subtest('load_interface')
        self.set_mode(self.MODE_BL, test_info)

        data_crc, crc_in_image = _compute_crc(filepath)
        assert data_crc == crc_in_image, ("CRC in interface is wrong "
                                          "expected 0x%x, found 0x%x" %
                                          (data_crc, crc_in_image))

        filename = os.path.basename(filepath)
        with open(filepath, 'rb') as firmware_file:
            data = firmware_file.read()
        out_file = self.get_file_path(filename)
        start = time.time()
        with open(out_file, 'wb') as firmware_file:
            firmware_file.write(data)
        stop = time.time()
        test_info.info("programming took %s s" % (stop - start))
        self.wait_for_remount(test_info)

        # Check the CRC
        self.set_mode(self.MODE_IF, test_info)
        if DaplinkBoard.KEY_IF_CRC not in self.details_txt:
            test_info.failure("No interface CRC in details.txt")
            return
        details_crc = int(self.details_txt[DaplinkBoard.KEY_IF_CRC], 0)
        test_info.info("Interface crc: 0x%x" % details_crc)
        if data_crc != details_crc:
            test_info.failure("Interface CRC is wrong")

    def load_bootloader(self, filepath, parent_test):
        """Load a bootloader binary or hex"""
        assert isinstance(filepath, str), "Invalid bootloader image!"
        assert isinstance(parent_test, TestInfo), "Invalid parent test object!"

        test_info = parent_test.create_subtest('load_bootloader')
        self.set_mode(self.MODE_IF, test_info)

        # Check image CRC
        data_crc, crc_in_image = _compute_crc(filepath)
        assert data_crc == crc_in_image, ("CRC in bootloader is wrong "
                                          "expected 0x%x, found 0x%x" %
                                          (data_crc, crc_in_image))

        filename = os.path.basename(filepath)
        with open(filepath, 'rb') as firmware_file:
            data = firmware_file.read()
        out_file = self.get_file_path(filename)
        start = time.time()
        with open(out_file, 'wb') as firmware_file:
            firmware_file.write(data)
        stop = time.time()
        test_info.info("programming took %s s" % (stop - start))
        self.wait_for_remount(test_info)

        # Check the CRC
        self.set_mode(self.MODE_IF, test_info)
        if DaplinkBoard.KEY_BL_CRC not in self.details_txt:
            test_info.failure("No bootloader CRC in details.txt")
            return
        details_crc = int(self.details_txt[DaplinkBoard.KEY_BL_CRC], 0)
        test_info.info("Bootloader crc: 0x%x" % details_crc)
        if data_crc != details_crc:
            test_info.failure("Bootloader CRC is wrong")

    def wait_for_remount(self, parent_test, wait_time=600):
        mode = self._mode
        count = self._remount_count
        test_info = parent_test.create_subtest('wait_for_remount')

        elapsed = 0
        start = time.time()
        remounted = False
        while os.path.isdir(self.mount_point):
            if self.update_board_info(False): #check info if it is already mounted
                if mode is not None and self._mode is not None and mode is not self._mode:
                    remounted = True
                    test_info.info("already remounted with change mode")
                    break
                elif count is not None and self._remount_count is not None and count != self._remount_count:
                        remounted = True
                        test_info.info("already remounted with change mount count")
                        break
            if elapsed > wait_time:
                raise Exception("Dismount timed out")
            time.sleep(0.1)
            elapsed += 0.2
        else:
            stop = time.time()
            test_info.info("unmount took %s s" % (stop - start))
        elapsed = 0
        start = time.time()

        while not remounted:
            if self.update_board_info(False):
                if os.path.isdir(self.mount_point):
                    # Information returned by mbed-ls could be old.
                    # Only break from the loop if the second call to
                    # mbed-ls returns the same mount point.
                    tmp_mount = self.mount_point
                    if self.update_board_info(False):
                        if tmp_mount == self.mount_point:
                            break
            if elapsed > wait_time:
                raise Exception("Mount timed out")
            time.sleep(0.1)
            elapsed += 0.1
        stop = time.time()
        test_info.info("mount took %s s" % (stop - start))

        if count is not None and self._remount_count is not None:
            expected_count = (0 if mode is not self._mode
                              else (count + 1) & 0xFFFFFFFF)
            if expected_count != self._remount_count:
                    test_info.failure('Expected remount count of %s got %s' %
                                      (expected_count, self._remount_count))

        # If enabled check the filesystem
        if self._check_fs_on_remount:
            self.test_fs(parent_test)
            self.test_fs_contents(parent_test)
            self.test_details_txt(parent_test)
            if self._manage_assert:
                if self._assert is not None:
                    test_info.failure('Assert on line %s in file %s' %
                                      (self._assert.line, self._assert.file))
                self.clear_assert()

    def update_board_info(self, exptn_on_fail=True):
        """Update board info

        Update all board information variables that could
        change when remounting or changing modes.
        Note - before this function is set self.unique_id
        must be set.
        """

        try:
            endpoints = _get_board_endpoints(self.unique_id)
            if endpoints is None:
                if exptn_on_fail:
                    raise Exception("Could not update board info: %s" %
                                    self.unique_id)
                return False
            self.unique_id, self.serial_port, self.mount_point = endpoints
            # Serial port can be missing
            if self.unique_id is None:
                if exptn_on_fail:
                    raise Exception("Mount point is null")
                return False
            if self.mount_point is None:
                if exptn_on_fail:
                    raise Exception("Mount point is null")
                return False
            self.board_id = int(self.unique_id[0:4], 16)
            self._hic_id = int(self.unique_id[-8:], 16)

            # Note - Some legacy boards might not have details.txt
            details_txt_path = self.get_file_path("details.txt")
            self.details_txt = _parse_kvp_file(details_txt_path)
            self._parse_assert_txt()

            self._remount_count = None
            if DaplinkBoard.KEY_REMOUNT_COUNT in self.details_txt:
                self._remount_count = int(self.details_txt[DaplinkBoard.KEY_REMOUNT_COUNT])
            self._mode = None
            if DaplinkBoard.KEY_MODE in self.details_txt:
                DETAILS_TO_MODE = {
                    "interface": DaplinkBoard.MODE_IF,
                    "bootloader": DaplinkBoard.MODE_BL,
                }
                mode_str = self.details_txt[DaplinkBoard.KEY_MODE]
                self._mode = DETAILS_TO_MODE[mode_str]
            else:
                #check for race condition here
                return False
            return True
        except Exception as e:
            if exptn_on_fail:
                raise e
            else:
                return False

    def test_details_txt(self, parent_test):
        """Check that details.txt has all requied fields"""
        test_info = parent_test.create_subtest('test_details_txt')
        required_key_and_format = {
            DaplinkBoard.KEY_UNIQUE_ID: re.compile("^[a-f0-9]{48}$"),
            DaplinkBoard.KEY_HIC_ID: re.compile("^[a-f0-9]{8}$"),
            DaplinkBoard.KEY_GIT_SHA: re.compile("^[a-f0-9]{40}$"),
            DaplinkBoard.KEY_LOCAL_MODS: re.compile("^[01]{1}$"),
            DaplinkBoard.KEY_USB_INTERFACES: re.compile("^.+$"),
            DaplinkBoard.KEY_MODE: re.compile("(interface|bootloader)"),
        }
        optional_key_and_format = {
            DaplinkBoard.KEY_BL_VERSION: re.compile("^[0-9]{4}$"),
            DaplinkBoard.KEY_IF_VERSION: re.compile("^[0-9]{4}$"),
            DaplinkBoard.KEY_BL_CRC: re.compile("^0x[a-f0-9]{8}$"),
            DaplinkBoard.KEY_IF_CRC: re.compile("^0x[a-f0-9]{8}$"),
        }
        # 1. keys and values are alphanumeric
        # 2. no duplicate keys
        # 3. format is key : value
        # 4. required keys are present
        # 5. optional keys have the expected format
        details_txt_path = self.get_file_path("details.txt")
        details_txt = _parse_kvp_file(details_txt_path, test_info)
        if not details_txt:
            test_info.failure("Could not parse details.txt")
            return

        # Check for required keys
        for key in required_key_and_format:
            if key not in details_txt:
                test_info.failure("Missing detail.txt entry: %s" % key)
                continue

            value = details_txt[key]
            pattern = required_key_and_format[key]
            if pattern.match(value) is None:
                test_info.failure("Bad format detail.txt %s: %s" %
                                  (key, value))

        # Check format of optional values
        for key in optional_key_and_format:
            if key not in details_txt:
                continue

            value = details_txt[key]
            pattern = optional_key_and_format[key]
            if pattern.match(value) is None:
                test_info.failure("Bad format detail.txt %s: %s" %
                                  (key, value))

        # Check details.txt contents
        details_unique_id = None
        details_hic_id = None
        if DaplinkBoard.KEY_UNIQUE_ID in details_txt:
            details_unique_id = details_txt[DaplinkBoard.KEY_UNIQUE_ID]
        if DaplinkBoard.KEY_HIC_ID in details_txt:
            details_hic_id = details_txt[DaplinkBoard.KEY_HIC_ID]
        if details_unique_id is not None:
            if details_unique_id != self.unique_id:
                test_info.failure("Unique ID mismatch in details.txt "
                                  "details.txt=%s, usb=%s" %
                                  (details_unique_id, self.unique_id))
            if details_hic_id is not None:
                usb_hic = details_unique_id[-8:]
                if details_hic_id != usb_hic:
                    test_info.failure("HIC ID is not the last 8 "
                                      "digits of unique ID "
                                      "details.txt=%s, usb=%s" %
                                      (details_hic_id, usb_hic))

    def _parse_assert_txt(self):
        file_path = self.get_file_path("ASSERT.TXT")
        if not os.path.isfile(file_path):
            self._assert = None
            return

        assert_table = _parse_kvp_file(file_path)
        assert "file" in assert_table
        assert "line" in assert_table

        self._assert = AssertInfo(assert_table["file"], assert_table['line'])