Repostiory containing DAPLink source code with Reset Pin workaround for HANI_IOT board.
Upstream: https://github.com/ARMmbed/DAPLink
Diff: test/usb_msd.py
- Revision:
- 0:01f31e923fe2
diff -r 000000000000 -r 01f31e923fe2 test/usb_msd.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/usb_msd.py Tue Apr 07 12:55:42 2020 +0200 @@ -0,0 +1,387 @@ +# +# DAPLink Interface Firmware +# Copyright (c) 2016-2016, ARM Limited, All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import struct +import numbers +import time +import usb.util + + +class USBMsd(object): + """Wrapper class for a MSD usb device""" + + # Bulk only transport documented in + # "Universal Serial Bus Mass Storage Class" + # SCSI commands documented in "SCSI Commands Reference Manual" by Seagate + + CLASS_MSD = 0x8 + # Write 10 + # Read 10 + # Test unit ready + # Request Sense + + # dCBWSignature + # dCBWTag + # dCBWDataTransferLength + # bmCBWFlags + # bCBWLUN + # bCBWCBLength + FMT_CBW = "<IIIBBB" + + # dCSWSignature + # dCSWTag + # dCSWDataResidue + # bCSWStatus + FMT_CSW = "<IIIB" + + CSW_STATUS_PASSED = 0 + CSW_STATUS_FAILED = 1 + CSW_STATUS_PHASE_ERROR = 2 + + class SCSIError(Exception): + + def __init__(self, error): + Exception.__init__(self) + self.value = error + + # Some SCSI commands + # Value Keil middleware define Seagate name + # 0x12 SCSI_INQUIRY INQUIRY + # 0x23 SCSI_READ_FORMAT_CAPACITIES Missing + # 0x25 SCSI_READ_CAPACITY READ CAPACITY (10) + # 0x28 SCSI_READ10 READ (10) + # 0x1A SCSI_MODE_SENSE6 MODE SENSE (6) + # 0x00 SCSI_TEST_UNIT_READY TEST UNIT READY + # 0x2A SCSI_WRITE10 WRITE (10) + # 0x03 SCSI_REQUEST_SENSE REQUEST SENSE + # 0x1E SCSI_MEDIA_REMOVAL Missing + + def __init__(self, device): + self._dev = device + self._if = None + self.ep_in = None + self.ep_out = None + self._locked = False + self._cbw_tag = 0 + self.timeout = 60 * 1000 + # delays are for testing only + self.delay_cbw_to_data = 0 + self.delay_data_to_csw = 0 + + # Find interface + for interface in device.get_active_configuration(): + if interface.bInterfaceClass == USBMsd.CLASS_MSD: + assert self._if is None + self._if = interface + assert self._if is not None + + # Find endpoints + for endpoint in self._if: + if endpoint.bEndpointAddress & 0x80: + assert self.ep_in is None + self.ep_in = endpoint + else: + assert self.ep_out is None + self.ep_out = endpoint + assert self.ep_in is not None + assert self.ep_out is not None + + def lock(self): + """Acquire exclisive access to MSD""" + assert not self._locked + + num = self._if.bInterfaceNumber + try: + if self._dev.is_kernel_driver_active(num): + self._dev.detach_kernel_driver(num) + except NotImplementedError: + pass + except usb.core.USBError: + pass + usb.util.claim_interface(self._dev, num) + self._locked = True + + def unlock(self): + """Release exclusive access to MSD""" + assert self._locked + + num = self._if.bInterfaceNumber + usb.util.release_interface(self._dev, num) + try: + self._dev.attach_kernel_driver(num) + except NotImplementedError: + pass + except usb.core.USBError: + pass + self._locked = False + + def scsi_read10(self, lba, block_count): + """Send the SCSI read 10 command and return the data read""" + block_size = 512 + + cbwcb = bytearray(10) + cbwcb[0] = 0x28 + cbwcb[2] = (lba >> (8 * 3)) & 0xFF + cbwcb[3] = (lba >> (8 * 2)) & 0xFF + cbwcb[4] = (lba >> (8 * 1)) & 0xFF + cbwcb[5] = (lba >> (8 * 0)) & 0xFF + cbwcb[7] = (block_count >> (8 * 1)) & 0xFF + cbwcb[8] = (block_count >> (8 * 0)) & 0xFF + ret, data = self._msd_transfer(cbwcb, 0, block_count * block_size) + if ret != self.CSW_STATUS_PASSED: + raise self.SCSIError(ret) + return data + + def scsi_write10(self, lba, data): + """Send the SCSI write 10 command""" + block_size = 512 + + assert len(data) % block_size == 0 + block_count = (len(data) + (block_size - 1)) // block_size + + cbwcb = bytearray(10) + cbwcb[0] = 0x2A + cbwcb[2] = (lba >> (8 * 3)) & 0xFF + cbwcb[3] = (lba >> (8 * 2)) & 0xFF + cbwcb[4] = (lba >> (8 * 1)) & 0xFF + cbwcb[5] = (lba >> (8 * 0)) & 0xFF + cbwcb[7] = (block_count >> (8 * 1)) & 0xFF + cbwcb[8] = (block_count >> (8 * 0)) & 0xFF + ret, _ = self._msd_transfer(cbwcb, 0, data) + if ret != self.CSW_STATUS_PASSED: + raise self.SCSIError(ret) + + def scsi_test_unit_ready(self): + """Send the SCSI test unit ready command and return status""" + cbwcb = bytearray(10) + cbwcb[0] = 0 + ret, _ = self._msd_transfer(cbwcb, 0) + return ret + + def _msd_transfer(self, cbwcb, lun, size_or_data=None): + """Perform a bulk only transfer""" + assert self._locked + assert 1 <= len(cbwcb) <= 16 + + # Increment packet tag + transfer_tag = self._cbw_tag + self._cbw_tag = (self._cbw_tag + 1) & 0xFFFFFFFF + + # None means data size of zero + if size_or_data is None: + size_or_data = 0 + + in_transfer = isinstance(size_or_data, numbers.Number) + transfer_size = (size_or_data if in_transfer else len(size_or_data)) + assert in_transfer or len(size_or_data) > 0 + + # Phase - Command transport + cbw_signature = 0x43425355 + cbw_tag = transfer_tag + cbw_data_transfer_length = transfer_size + cbw_flags = (1 << 7) if in_transfer else 0 + cbw_lun = lun + cbw_length = len(cbwcb) + params = [cbw_signature, cbw_tag, cbw_data_transfer_length, + cbw_flags, cbw_lun, cbw_length] + cbw = struct.pack(self.FMT_CBW, *params) + pad_size = 16 - len(cbwcb) + payload = cbw + cbwcb + bytearray(pad_size) + self.ep_out.write(payload) + + if self.delay_cbw_to_data != 0: + time.sleep(self.delay_cbw_to_data) + + # Phase - Data Out or Data In (Optional) + data = None + if transfer_size > 0: + endpoint = self.ep_in if in_transfer else self.ep_out + try: + if in_transfer: + data = self.ep_in.read(transfer_size, self.timeout) + else: + self.ep_out.write(size_or_data, self.timeout) + except usb.core.USBError: + endpoint.clear_halt() + + if self.delay_data_to_csw != 0: + time.sleep(self.delay_data_to_csw) + + # Phase - Status Transport + csw = self.ep_in.read(13, self.timeout) + csw_signature, csw_tag, csw_data_residue, csw_status = \ + struct.unpack(self.FMT_CSW, csw) + assert csw_signature == 0x53425355 + assert csw_tag == transfer_tag + #TODO - check residue + return (csw_status, data) + + +class Struct(object): + """Base class for a C structure""" + + def __init__(self, name, structure, data): + field_list = [field[0] for field in structure] + fmt_list = [field[1] for field in structure] + format_str = "<" + "".join(fmt_list) + struct_size = struct.calcsize(format_str) + value_list = struct.unpack(format_str, data[:struct_size]) + value_dict = {} + for name, value in zip(field_list, value_list): + value_dict[name] = value + self.name = name + self.format_str = format_str + self.field_list = field_list + self.value_dict = value_dict + self.size = struct_size + + def __getitem__(self, key): + return self.value_dict[key] + + def __setitem__(self, key, value): + self.value_dict[key] = value + + def __str__(self): + desc = "" + desc += self.name + ":" + os.linesep + for field in self.field_list: + value = self.value_dict[field] + if isinstance(value, bytes): + value = list(bytearray(value)) + desc += (" %s=%s" + os.linesep) % (field, value) + return desc + + def pack(self): + """Return a byte representation of this structure""" + value_list = [] + for field in self.field_list: + value_list.append(self.value_dict[field]) + return struct.pack(self.format_str, *value_list) + + +class MBR(Struct): + """Wrapper class for a FAT MBR""" + + STRUCTURE = ( + ("BS_jmpBoot", "3s"), + ("BS_OEMName", "8s"), + ("BPB_BytsPerSec", "H"), + ("BPB_SecPerClus", "B"), + ("BPB_RsvdSecCnt", "H"), + ("BPB_NumFATs", "B"), + ("BPB_RootEntCnt", "H"), + ("BPB_TotSec16", "H"), + ("BPB_Media", "B"), + ("BPB_FATSz16", "H"), + ("BPB_SecPerTrk", "H"), + ("BPB_NumHeads", "H"), + ("BPB_HiddSec", "L"), + ("BPB_TotSec32", "L"), + ) + + def __init__(self, data, sector=None): + Struct.__init__(self, "MBR", self.STRUCTURE, data) + self.sector = sector + + +class DirectoryEntry(Struct): + """Wrapper class for a FAT DirectoryEntry""" + + STRUCTURE = ( + ("DIR_Name", "11s"), + ("DIR_Attr", "B"), + ("DIR_NTRes", "B"), + ("DIR_CrtTimeTenth", "B"), + ("DIR_CrtTime", "H"), + ("DIR_CrtDate", "H"), + ("DIR_LstAccDate", "H"), + ("DIR_FstClusHI", "H"), + ("DIR_WrtTime", "H"), + ("DIR_WrtDate", "H"), + ("DIR_FstClusLO", "H"), + ("DIR_FileSize", "L"), + ) + + def __init__(self, data): + Struct.__init__(self, "DirectoryEntry", self.STRUCTURE, data) + + +class Directory(object): + """Wrapper class for a FAT Directory""" + + ENTRY_SIZE = 32 + + def __init__(self, entry_count, data, sector=None): + directory_list = [] + for i in range(entry_count): + start = i * self.ENTRY_SIZE + dir_data = data[start:start + self.ENTRY_SIZE] + entry = DirectoryEntry(dir_data) + directory_list.append(entry) + self.directory_list = directory_list + self.sector = sector + + def __iter__(self): + return iter(self.directory_list) + + def __getitem__(self, key): + return self.directory_list[key] + + def find_free_entry_index(self): + """Find a free index in this Directory or return None""" + for idx, directory in enumerate(self.directory_list): + name_data = bytearray(directory["DIR_Name"]) + if name_data[0] in (0x00, 0xE5): + return idx + return None + + def pack(self): + """Return a byte a Directory""" + data = bytearray() + for directory in self.directory_list: + data.extend(directory.pack()) + return data + + +class Fat(object): + """Wrapper class for a FAT filesystem on a SCSI device""" + + SECTOR_SIZE = 512 + CLUSTER_SIZE = 4 * 1024 + + def __init__(self, msd): + self.msd = msd + self.reload() + + def reload(self): + """Reload all internal data of this Fat filesystem""" + + # Read MBR + mbr_data = self.msd.scsi_read10(0, 1) + mbr = MBR(mbr_data, 0) + + # Read in the root directory + root_dir_sec = (mbr["BPB_RsvdSecCnt"] + + (mbr["BPB_NumFATs"] * mbr["BPB_FATSz16"])) + sec_count = (mbr["BPB_RootEntCnt"] * 32 + 512 - 1) // 512 + root_dir_data = self.msd.scsi_read10(root_dir_sec, sec_count) + root_dir = Directory(mbr["BPB_RootEntCnt"], root_dir_data, + root_dir_sec) + self.mbr = mbr + self.root_dir = root_dir