Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers usb_test.py Source File

usb_test.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2016-2016, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 """ Module for low level and targeted USB tests"""
00019 
00020 from __future__ import print_function
00021 
00022 import os
00023 import usb.core
00024 import functools
00025 import threading
00026 import time
00027 from usb_cdc import USBCdc
00028 from usb_hid import USBHid
00029 from usb_msd import USBMsd, Fat
00030 
00031 DISMOUNT_TIME_S = 10.00
00032 
00033 
00034 def test_usb(workspace, parent_test, force=False):
00035     """Run raw USB tests
00036 
00037     Requirements:
00038         -daplink-validation must be loaded for the target.
00039     """
00040 
00041     # Only test on supported platforms
00042     if not _platform_supports_usb_test() and not force:
00043         parent_test.info("Skipping USB test on this platform")
00044         return
00045     test_info = parent_test.create_subtest("USB test")
00046 
00047     # Find the device under test
00048     serial_number = workspace.board.get_unique_id()
00049     dev = _daplink_from_serial_number(serial_number)
00050     if dev is None:
00051         test_info.failure("Could not find board with serial number %s" %
00052                           serial_number)
00053         return
00054 
00055     # Create wrappers for and acquire exclusive access to interfaces
00056     cdc = USBCdc(dev)
00057     hid = USBHid(dev)
00058     msd = USBMsd(dev)
00059     cdc.lock()
00060     hid.lock()
00061     msd.lock()
00062 
00063     try:
00064 
00065         for usb_test_on in (False, True):
00066 
00067             _set_usb_test_mode(hid, usb_test_on)
00068 
00069             test_cdc(test_info, cdc)
00070 
00071             test_hid(test_info, hid)
00072 
00073             test_msd(test_info, msd)
00074 
00075             test_msd_stall(test_info, msd)
00076 
00077             test_control(test_info, dev, cdc, hid, msd)
00078 
00079             test_all(test_info, cdc, hid, msd)
00080 
00081             # TODO - future enhancements
00082             #  MSD remount + hid
00083             #  STALL IN and STALL OUT
00084 
00085     finally:
00086         try:
00087             _set_usb_test_mode(hid, False)
00088         except usb.core.USBError:
00089             pass
00090         cdc.unlock()
00091         hid.unlock()
00092         msd.unlock()
00093 
00094 
00095 def main():
00096     """Run the usb test as a stand alone program"""
00097 
00098     import test_info
00099     import mock
00100 
00101     def get_unique_id(unique_id):
00102         """Mock function to return a unique id"""
00103         return unique_id
00104 
00105     dev_list = usb.core.find(find_all=True, custom_match=_daplink_match)
00106     for dev in dev_list:
00107         board_id = dev.serial_number
00108         print("Testing board %s" % board_id)
00109         print("----------------")
00110         mock_ws = mock.Mock()
00111         mock_ws.board = mock.Mock()
00112         mock_ws.board.unique_id = dev.serial_number
00113         mock_ws.board.get_unique_id = functools.partial(get_unique_id,
00114                                                         board_id)
00115         test_usb(mock_ws, test_info.TestInfoStub(), True)
00116 
00117 
00118 def test_cdc(test_info, cdc):
00119     """Smoke test of the CDC endpoint"""
00120     cdc.set_line_coding(115200)
00121     baud, fmt, parity, databits = cdc.get_line_coding()
00122     test_info.info("Baud %i, fmt %i, parity %i, databits %i" %
00123                    (baud, fmt, parity, databits))
00124     cdc.send_break(cdc.SEND_BREAK_ON)
00125     cdc.send_break(cdc.SEND_BREAK_OFF)
00126     data = cdc.read(1024)
00127     test_info.info("Serial port data: %s" % bytearray(data))
00128     cdc.write("Hello world")
00129     data = cdc.read(1024)
00130     test_info.info("Serial port data2: %s" % bytearray(data))
00131 
00132 
00133 def test_hid(test_info, hid):
00134     """Smoke test of the HID endpoint"""
00135     hid.set_idle()
00136     report = hid.get_descriptor(hid.DESC_TYPE_REPORT, 0)
00137     test_info.info("Report descriptor: %s" % report)
00138     # Send CMSIS-DAP vendor command to get the serial number
00139     data = bytearray(64)
00140     data[0] = 0x80
00141     hid.set_report(data)
00142     resp = hid.get_report(64)
00143     length = resp[1]
00144     test_info.info("CMSIS-DAP response: %s" %
00145                    bytearray(resp[1:1 + length]).decode("utf-8"))
00146 
00147 
00148 def test_msd(test_info, msd):
00149     """MSD endpoint tests"""
00150 
00151     # Simple read
00152     mbr = msd.scsi_read10(0, 1)
00153     test_info.info("MBR[0:16]: %s" % mbr[0:16])
00154 
00155     # Test FAT filesystem
00156     fat = Fat(msd)
00157     print(fat.mbr)
00158 
00159     # Grab entries in the root directory
00160     root_dir = fat.root_dir
00161     for entry in root_dir:
00162         if entry["DIR_Name"][0] != "\0":
00163             print(entry)
00164 
00165     # Trigger a remount
00166     dir_idx = root_dir.find_free_entry_index()
00167     root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT"
00168     root_dir_data = root_dir.pack()
00169     msd.scsi_write10(root_dir.sector, root_dir_data)
00170 
00171     test_info.info("Added file to root directory")
00172     start = time.time()
00173     while time.time() - start < DISMOUNT_TIME_S:
00174         try:
00175             msd.scsi_read10(0, 1)
00176         except msd.SCSIError:
00177             test_info.info("Dismount detected")
00178             break
00179     else:
00180         test_info.failure("Device failed to dismount")
00181 
00182     start = time.time()
00183     while time.time() - start < DISMOUNT_TIME_S:
00184         try:
00185             msd.scsi_read10(0, 1)
00186             test_info.info("Mount detected")
00187             break
00188         except msd.SCSIError:
00189             pass
00190     else:
00191         test_info.failure("Device failed to mount")
00192 
00193 
00194 def test_msd_stall(test_info, msd):
00195     """Test stalls coming at various times in the middle of MSD xfers"""
00196     fat = Fat(msd)
00197     root_dir = fat.root_dir
00198     dir_idx = root_dir.find_free_entry_index()
00199     root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT"
00200     root_dir_data = root_dir.pack()
00201 
00202     # Test that a write fails if media is removed after the CBW
00203     # stage but before the CSW stage
00204     msd.scsi_write10(root_dir.sector, root_dir_data)
00205     msd.delay_cbw_to_data = 1.0
00206     retval = msd.CSW_STATUS_PASSED
00207     try:
00208         msd.scsi_write10(0, bytearray(512))
00209         test_info.failure("Device failed to stall data stage")
00210     except msd.SCSIError as error:
00211         retval = error.value
00212     msd.delay_cbw_to_data = 0
00213     # Make sure device still works as expected
00214     time.sleep(3)
00215     msd.scsi_read10(0, 1)
00216     msd.scsi_write10(0, bytearray(512))
00217     if retval == msd.CSW_STATUS_FAILED:
00218         test_info.info("Test CBW,Stall,Data OUT - Pass")
00219     else:
00220         test_info.failure("Device returned wrong status")
00221 
00222     # Test that a write succeeds even if media is removed
00223     # after the OUT stage but before the CSW stage
00224     msd.scsi_write10(root_dir.sector, root_dir_data)
00225     msd.delay_data_to_csw = 1.0
00226     msd.scsi_write10(0, bytearray(512))
00227     msd.delay_data_to_csw = 0
00228     # Make sure device still works as expected
00229     time.sleep(3)
00230     msd.scsi_read10(0, 1)
00231     msd.scsi_write10(0, bytearray(512))
00232     test_info.info("Test DATA OUT,Stall,CSW - Pass")
00233 
00234     # Test that a read succeeds even if media is removed
00235     # after the IN stage but before the CSW stage
00236     msd.scsi_write10(root_dir.sector, root_dir_data)
00237     msd.delay_data_to_csw = 1.0
00238     resp = msd.scsi_read10(0, 1)
00239     assert len(resp) == 512
00240     msd.delay_data_to_csw = 0
00241     # Make sure device still works as expected
00242     time.sleep(3)
00243     msd.scsi_read10(0, 1)
00244     msd.scsi_write10(0, bytearray(512))
00245     test_info.info("Test DATA IN,Stall,CSW - Pass")
00246 
00247     # Test that a test unit ready succeeds even if media is removed
00248     # after the CBW stage but before the CSW stage
00249     msd.scsi_write10(root_dir.sector, root_dir_data)
00250     msd.delay_data_to_csw = 1.0
00251     resp = msd.scsi_test_unit_ready()
00252     msd.delay_data_to_csw = 0
00253     # Make sure device still works as expected
00254     time.sleep(3)
00255     msd.scsi_read10(0, 1)
00256     msd.scsi_write10(0, bytearray(512))
00257     if resp == msd.CSW_STATUS_PASSED:
00258         test_info.info("Test CBW,Stall,CSW - Pass")
00259     else:
00260         test_info.failure("Test CBW,Stall,CSW - Failed")
00261 
00262     # Test that a test unit ready succeeds even if media is removed
00263     # after the CBW stage but before the CSW stage
00264     msd.scsi_write10(root_dir.sector, root_dir_data)
00265     time.sleep(1.0)
00266     resp = msd.scsi_test_unit_ready()
00267     # Make sure device still works as expected
00268     time.sleep(3)
00269     msd.scsi_read10(0, 1)
00270     msd.scsi_write10(0, bytearray(512))
00271     if resp == msd.CSW_STATUS_FAILED:
00272         test_info.info("Test CBW,Stall,CSW - Pass")
00273     else:
00274         test_info.failure("Test CBW,Stall,CSW - Failed")
00275 
00276 
00277 def test_control(test_info, dev, cdc, hid, msd):
00278     """Test for the control endpoint"""
00279 
00280     test_info.info("testing control transfer with size a multiple of 256")
00281     request_type = 0x80
00282     request = 0x06            # Get descriptor
00283     value = 0x200              # Configuration descriptor
00284     index = 0                  # Always 0 for this request
00285     resp = dev.ctrl_transfer(request_type, request, value, index, 256)
00286     assert len(resp) > 0
00287 
00288     test_info.info("testing control commands")
00289     # Test various patterns of control transfers
00290     #
00291     # Some devices have had problems with back-to-back
00292     # control transfers. Intentionally send these sequences
00293     # to make sure they are properly handled.
00294     for _ in range(100):
00295         # Control transfer with a data in stage
00296         cdc.get_line_coding()
00297     for _ in range(100):
00298         # Control transfer with a data out stage followed
00299         # by a control transfer with a data in stage
00300         cdc.set_line_coding(115200)
00301         cdc.get_line_coding()
00302     for _ in range(100):
00303         # Control transfer with a data out stage
00304         cdc.set_line_coding(115200)
00305 
00306     test_info.info("testing endpoint clearing")
00307 
00308     cdc.ep_data_out.clear_halt()
00309     cdc.ep_data_out.write('')      # DATA0
00310     cdc.ep_data_out.clear_halt()
00311     cdc.ep_data_out.write('')      # DATA0
00312 
00313     cdc.ep_data_out.clear_halt()
00314     cdc.ep_data_out.write('')      # DATA 0
00315     cdc.ep_data_out.write('')      # DATA 1
00316     cdc.ep_data_out.clear_halt()
00317     cdc.ep_data_out.write('')      # DATA 0
00318 
00319 
00320 def test_all(test_info, cdc, hid, msd):
00321     """Test all endpoints in parallel"""
00322     mutex = threading.RLock()
00323     terminate = False
00324     error_msg_list = []
00325 
00326     def _safe_print(message):
00327         """Thread safe wrapper to print messages"""
00328         with mutex:
00329             print(message)
00330 
00331     def _test_msd():
00332         """MSD thread entry point for parallel testing"""
00333         try:
00334             _safe_print("msd started")
00335             msd_data = msd.scsi_read10(100, 1)
00336             while not terminate:
00337                 #msd_data = 'x' * 1024 * 16  # 16KB
00338                 msd.scsi_write10(100, msd_data)
00339             _safe_print("msd end")
00340         except:
00341             error_msg_list.append("MSD test failed")
00342             raise
00343 
00344     def _test_cdc():
00345         """CDC thread entry point for parallel testing"""
00346         try:
00347             _safe_print("cdc started")
00348             while not terminate:
00349                 cdc.set_line_coding(115200)
00350                 cdc.get_line_coding()
00351                 cdc.send_break(cdc.SEND_BREAK_ON)
00352                 cdc.send_break(cdc.SEND_BREAK_OFF)
00353                 cdc.read(1024)
00354                 cdc.write("Hello world")
00355                 cdc.read(1024)
00356             _safe_print("cdc end")
00357         except:
00358             error_msg_list.append("CDC test failed")
00359             raise
00360 
00361     def _test_hid():
00362         """HID thread entry point for parallel testing"""
00363         try:
00364             _safe_print("hid started")
00365             data = bytearray(64)
00366             data[0] = 0x80
00367             while not terminate:
00368                 hid.set_report(data)
00369                 resp = hid.get_report(64)
00370                 assert resp[0] == 0x80
00371             _safe_print("hid end")
00372         except:
00373             error_msg_list.append("HID test failed")
00374             raise
00375 
00376     thread_list = []
00377     for function in (_test_msd, _test_cdc, _test_hid):
00378         thread = threading.Thread(target=function)
00379         thread.start()
00380         thread_list.append(thread)
00381 
00382     time.sleep(10)
00383 
00384     terminate = True
00385     for thread in thread_list:
00386         thread.join()
00387 
00388     for error in error_msg_list:
00389         test_info.failure(error)
00390 
00391 
00392 def _daplink_match(dev):
00393     """DAPLink match function to be used with usb.core.find"""
00394     try:
00395         device_string = dev.product
00396     except ValueError:
00397         return False
00398     if device_string is None:
00399         return False
00400     if device_string.find("CMSIS-DAP") < 0:
00401         return False
00402     return True
00403 
00404 
00405 def _daplink_from_serial_number(serial_number):
00406     """Return a usb handle to the DAPLink device with the serial number"""
00407     dev_list = usb.core.find(find_all=True, custom_match=_daplink_match)
00408     for dev in dev_list:
00409         if dev.serial_number == serial_number:
00410             return dev
00411     return None
00412 
00413 
00414 def _platform_supports_usb_test():
00415     """Return True if this platform supports USB testing, False otherwise"""
00416     if os.name != "posix":
00417         return False
00418     if not hasattr(os, 'uname'):
00419         if False:
00420             # Hack to supress warnings for uname not existing
00421             os.uname = lambda: [None]
00422         return False
00423     if os.uname()[0] == "Darwin":
00424         return False
00425     return True
00426 
00427 
00428 def _set_usb_test_mode(hid, enabled):
00429     """Set to True to enable USB testing mode"""
00430     data = bytearray(64)
00431     data[0] = 0x88
00432     data[1] = 1 if enabled else 0
00433     hid.set_report(data)
00434     resp = hid.get_report(64)
00435     if (resp[0] != 0x88) or (resp[1] != 1):
00436         raise Exception("Error configuring USB test mode")
00437 
00438 
00439 if __name__ == "__main__":
00440     main()