Repostiory containing DAPLink source code with Reset Pin workaround for HANI_IOT board.

Upstream: https://github.com/ARMmbed/DAPLink

Revision:
0:01f31e923fe2
diff -r 000000000000 -r 01f31e923fe2 test/usb_test.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/usb_test.py	Tue Apr 07 12:55:42 2020 +0200
@@ -0,0 +1,440 @@
+#
+# DAPLink Interface Firmware
+# Copyright (c) 2016-2016, ARM Limited, All Rights Reserved
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+""" Module for low level and targeted USB tests"""
+
+from __future__ import print_function
+
+import os
+import usb.core
+import functools
+import threading
+import time
+from usb_cdc import USBCdc
+from usb_hid import USBHid
+from usb_msd import USBMsd, Fat
+
+DISMOUNT_TIME_S = 10.00
+
+
+def test_usb(workspace, parent_test, force=False):
+    """Run raw USB tests
+
+    Requirements:
+        -daplink-validation must be loaded for the target.
+    """
+
+    # Only test on supported platforms
+    if not _platform_supports_usb_test() and not force:
+        parent_test.info("Skipping USB test on this platform")
+        return
+    test_info = parent_test.create_subtest("USB test")
+
+    # Find the device under test
+    serial_number = workspace.board.get_unique_id()
+    dev = _daplink_from_serial_number(serial_number)
+    if dev is None:
+        test_info.failure("Could not find board with serial number %s" %
+                          serial_number)
+        return
+
+    # Create wrappers for and acquire exclusive access to interfaces
+    cdc = USBCdc(dev)
+    hid = USBHid(dev)
+    msd = USBMsd(dev)
+    cdc.lock()
+    hid.lock()
+    msd.lock()
+
+    try:
+
+        for usb_test_on in (False, True):
+
+            _set_usb_test_mode(hid, usb_test_on)
+
+            test_cdc(test_info, cdc)
+
+            test_hid(test_info, hid)
+
+            test_msd(test_info, msd)
+
+            test_msd_stall(test_info, msd)
+
+            test_control(test_info, dev, cdc, hid, msd)
+
+            test_all(test_info, cdc, hid, msd)
+
+            # TODO - future enhancements
+            #  MSD remount + hid
+            #  STALL IN and STALL OUT
+
+    finally:
+        try:
+            _set_usb_test_mode(hid, False)
+        except usb.core.USBError:
+            pass
+        cdc.unlock()
+        hid.unlock()
+        msd.unlock()
+
+
+def main():
+    """Run the usb test as a stand alone program"""
+
+    import test_info
+    import mock
+
+    def get_unique_id(unique_id):
+        """Mock function to return a unique id"""
+        return unique_id
+
+    dev_list = usb.core.find(find_all=True, custom_match=_daplink_match)
+    for dev in dev_list:
+        board_id = dev.serial_number
+        print("Testing board %s" % board_id)
+        print("----------------")
+        mock_ws = mock.Mock()
+        mock_ws.board = mock.Mock()
+        mock_ws.board.unique_id = dev.serial_number
+        mock_ws.board.get_unique_id = functools.partial(get_unique_id,
+                                                        board_id)
+        test_usb(mock_ws, test_info.TestInfoStub(), True)
+
+
+def test_cdc(test_info, cdc):
+    """Smoke test of the CDC endpoint"""
+    cdc.set_line_coding(115200)
+    baud, fmt, parity, databits = cdc.get_line_coding()
+    test_info.info("Baud %i, fmt %i, parity %i, databits %i" %
+                   (baud, fmt, parity, databits))
+    cdc.send_break(cdc.SEND_BREAK_ON)
+    cdc.send_break(cdc.SEND_BREAK_OFF)
+    data = cdc.read(1024)
+    test_info.info("Serial port data: %s" % bytearray(data))
+    cdc.write("Hello world")
+    data = cdc.read(1024)
+    test_info.info("Serial port data2: %s" % bytearray(data))
+
+
+def test_hid(test_info, hid):
+    """Smoke test of the HID endpoint"""
+    hid.set_idle()
+    report = hid.get_descriptor(hid.DESC_TYPE_REPORT, 0)
+    test_info.info("Report descriptor: %s" % report)
+    # Send CMSIS-DAP vendor command to get the serial number
+    data = bytearray(64)
+    data[0] = 0x80
+    hid.set_report(data)
+    resp = hid.get_report(64)
+    length = resp[1]
+    test_info.info("CMSIS-DAP response: %s" %
+                   bytearray(resp[1:1 + length]).decode("utf-8"))
+
+
+def test_msd(test_info, msd):
+    """MSD endpoint tests"""
+
+    # Simple read
+    mbr = msd.scsi_read10(0, 1)
+    test_info.info("MBR[0:16]: %s" % mbr[0:16])
+
+    # Test FAT filesystem
+    fat = Fat(msd)
+    print(fat.mbr)
+
+    # Grab entries in the root directory
+    root_dir = fat.root_dir
+    for entry in root_dir:
+        if entry["DIR_Name"][0] != "\0":
+            print(entry)
+
+    # Trigger a remount
+    dir_idx = root_dir.find_free_entry_index()
+    root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT"
+    root_dir_data = root_dir.pack()
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+
+    test_info.info("Added file to root directory")
+    start = time.time()
+    while time.time() - start < DISMOUNT_TIME_S:
+        try:
+            msd.scsi_read10(0, 1)
+        except msd.SCSIError:
+            test_info.info("Dismount detected")
+            break
+    else:
+        test_info.failure("Device failed to dismount")
+
+    start = time.time()
+    while time.time() - start < DISMOUNT_TIME_S:
+        try:
+            msd.scsi_read10(0, 1)
+            test_info.info("Mount detected")
+            break
+        except msd.SCSIError:
+            pass
+    else:
+        test_info.failure("Device failed to mount")
+
+
+def test_msd_stall(test_info, msd):
+    """Test stalls coming at various times in the middle of MSD xfers"""
+    fat = Fat(msd)
+    root_dir = fat.root_dir
+    dir_idx = root_dir.find_free_entry_index()
+    root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT"
+    root_dir_data = root_dir.pack()
+
+    # Test that a write fails if media is removed after the CBW
+    # stage but before the CSW stage
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+    msd.delay_cbw_to_data = 1.0
+    retval = msd.CSW_STATUS_PASSED
+    try:
+        msd.scsi_write10(0, bytearray(512))
+        test_info.failure("Device failed to stall data stage")
+    except msd.SCSIError as error:
+        retval = error.value
+    msd.delay_cbw_to_data = 0
+    # Make sure device still works as expected
+    time.sleep(3)
+    msd.scsi_read10(0, 1)
+    msd.scsi_write10(0, bytearray(512))
+    if retval == msd.CSW_STATUS_FAILED:
+        test_info.info("Test CBW,Stall,Data OUT - Pass")
+    else:
+        test_info.failure("Device returned wrong status")
+
+    # Test that a write succeeds even if media is removed
+    # after the OUT stage but before the CSW stage
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+    msd.delay_data_to_csw = 1.0
+    msd.scsi_write10(0, bytearray(512))
+    msd.delay_data_to_csw = 0
+    # Make sure device still works as expected
+    time.sleep(3)
+    msd.scsi_read10(0, 1)
+    msd.scsi_write10(0, bytearray(512))
+    test_info.info("Test DATA OUT,Stall,CSW - Pass")
+
+    # Test that a read succeeds even if media is removed
+    # after the IN stage but before the CSW stage
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+    msd.delay_data_to_csw = 1.0
+    resp = msd.scsi_read10(0, 1)
+    assert len(resp) == 512
+    msd.delay_data_to_csw = 0
+    # Make sure device still works as expected
+    time.sleep(3)
+    msd.scsi_read10(0, 1)
+    msd.scsi_write10(0, bytearray(512))
+    test_info.info("Test DATA IN,Stall,CSW - Pass")
+
+    # Test that a test unit ready succeeds even if media is removed
+    # after the CBW stage but before the CSW stage
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+    msd.delay_data_to_csw = 1.0
+    resp = msd.scsi_test_unit_ready()
+    msd.delay_data_to_csw = 0
+    # Make sure device still works as expected
+    time.sleep(3)
+    msd.scsi_read10(0, 1)
+    msd.scsi_write10(0, bytearray(512))
+    if resp == msd.CSW_STATUS_PASSED:
+        test_info.info("Test CBW,Stall,CSW - Pass")
+    else:
+        test_info.failure("Test CBW,Stall,CSW - Failed")
+
+    # Test that a test unit ready succeeds even if media is removed
+    # after the CBW stage but before the CSW stage
+    msd.scsi_write10(root_dir.sector, root_dir_data)
+    time.sleep(1.0)
+    resp = msd.scsi_test_unit_ready()
+    # Make sure device still works as expected
+    time.sleep(3)
+    msd.scsi_read10(0, 1)
+    msd.scsi_write10(0, bytearray(512))
+    if resp == msd.CSW_STATUS_FAILED:
+        test_info.info("Test CBW,Stall,CSW - Pass")
+    else:
+        test_info.failure("Test CBW,Stall,CSW - Failed")
+
+
+def test_control(test_info, dev, cdc, hid, msd):
+    """Test for the control endpoint"""
+
+    test_info.info("testing control transfer with size a multiple of 256")
+    request_type = 0x80
+    request = 0x06            # Get descriptor
+    value = 0x200              # Configuration descriptor
+    index = 0                  # Always 0 for this request
+    resp = dev.ctrl_transfer(request_type, request, value, index, 256)
+    assert len(resp) > 0
+
+    test_info.info("testing control commands")
+    # Test various patterns of control transfers
+    #
+    # Some devices have had problems with back-to-back
+    # control transfers. Intentionally send these sequences
+    # to make sure they are properly handled.
+    for _ in range(100):
+        # Control transfer with a data in stage
+        cdc.get_line_coding()
+    for _ in range(100):
+        # Control transfer with a data out stage followed
+        # by a control transfer with a data in stage
+        cdc.set_line_coding(115200)
+        cdc.get_line_coding()
+    for _ in range(100):
+        # Control transfer with a data out stage
+        cdc.set_line_coding(115200)
+
+    test_info.info("testing endpoint clearing")
+
+    cdc.ep_data_out.clear_halt()
+    cdc.ep_data_out.write('')      # DATA0
+    cdc.ep_data_out.clear_halt()
+    cdc.ep_data_out.write('')      # DATA0
+
+    cdc.ep_data_out.clear_halt()
+    cdc.ep_data_out.write('')      # DATA 0
+    cdc.ep_data_out.write('')      # DATA 1
+    cdc.ep_data_out.clear_halt()
+    cdc.ep_data_out.write('')      # DATA 0
+
+
+def test_all(test_info, cdc, hid, msd):
+    """Test all endpoints in parallel"""
+    mutex = threading.RLock()
+    terminate = False
+    error_msg_list = []
+
+    def _safe_print(message):
+        """Thread safe wrapper to print messages"""
+        with mutex:
+            print(message)
+
+    def _test_msd():
+        """MSD thread entry point for parallel testing"""
+        try:
+            _safe_print("msd started")
+            msd_data = msd.scsi_read10(100, 1)
+            while not terminate:
+                #msd_data = 'x' * 1024 * 16  # 16KB
+                msd.scsi_write10(100, msd_data)
+            _safe_print("msd end")
+        except:
+            error_msg_list.append("MSD test failed")
+            raise
+
+    def _test_cdc():
+        """CDC thread entry point for parallel testing"""
+        try:
+            _safe_print("cdc started")
+            while not terminate:
+                cdc.set_line_coding(115200)
+                cdc.get_line_coding()
+                cdc.send_break(cdc.SEND_BREAK_ON)
+                cdc.send_break(cdc.SEND_BREAK_OFF)
+                cdc.read(1024)
+                cdc.write("Hello world")
+                cdc.read(1024)
+            _safe_print("cdc end")
+        except:
+            error_msg_list.append("CDC test failed")
+            raise
+
+    def _test_hid():
+        """HID thread entry point for parallel testing"""
+        try:
+            _safe_print("hid started")
+            data = bytearray(64)
+            data[0] = 0x80
+            while not terminate:
+                hid.set_report(data)
+                resp = hid.get_report(64)
+                assert resp[0] == 0x80
+            _safe_print("hid end")
+        except:
+            error_msg_list.append("HID test failed")
+            raise
+
+    thread_list = []
+    for function in (_test_msd, _test_cdc, _test_hid):
+        thread = threading.Thread(target=function)
+        thread.start()
+        thread_list.append(thread)
+
+    time.sleep(10)
+
+    terminate = True
+    for thread in thread_list:
+        thread.join()
+
+    for error in error_msg_list:
+        test_info.failure(error)
+
+
+def _daplink_match(dev):
+    """DAPLink match function to be used with usb.core.find"""
+    try:
+        device_string = dev.product
+    except ValueError:
+        return False
+    if device_string is None:
+        return False
+    if device_string.find("CMSIS-DAP") < 0:
+        return False
+    return True
+
+
+def _daplink_from_serial_number(serial_number):
+    """Return a usb handle to the DAPLink device with the serial number"""
+    dev_list = usb.core.find(find_all=True, custom_match=_daplink_match)
+    for dev in dev_list:
+        if dev.serial_number == serial_number:
+            return dev
+    return None
+
+
+def _platform_supports_usb_test():
+    """Return True if this platform supports USB testing, False otherwise"""
+    if os.name != "posix":
+        return False
+    if not hasattr(os, 'uname'):
+        if False:
+            # Hack to supress warnings for uname not existing
+            os.uname = lambda: [None]
+        return False
+    if os.uname()[0] == "Darwin":
+        return False
+    return True
+
+
+def _set_usb_test_mode(hid, enabled):
+    """Set to True to enable USB testing mode"""
+    data = bytearray(64)
+    data[0] = 0x88
+    data[1] = 1 if enabled else 0
+    hid.set_report(data)
+    resp = hid.get_report(64)
+    if (resp[0] != 0x88) or (resp[1] != 1):
+        raise Exception("Error configuring USB test mode")
+
+
+if __name__ == "__main__":
+    main()