Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers usb_msd.py Source File

usb_msd.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2016-2016, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 
00019 import os
00020 import struct
00021 import numbers
00022 import time
00023 import usb.util
00024 
00025 
00026 class USBMsd (object):
00027     """Wrapper class for a MSD usb device"""
00028 
00029     # Bulk only transport documented in
00030     #     "Universal Serial Bus Mass Storage Class"
00031     # SCSI commands documented in "SCSI Commands Reference Manual" by Seagate
00032 
00033     CLASS_MSD = 0x8
00034     # Write 10
00035     # Read 10
00036     # Test unit ready
00037     # Request Sense
00038 
00039     # dCBWSignature
00040     # dCBWTag
00041     # dCBWDataTransferLength
00042     # bmCBWFlags
00043     # bCBWLUN
00044     # bCBWCBLength
00045     FMT_CBW = "<IIIBBB"
00046 
00047     # dCSWSignature
00048     # dCSWTag
00049     # dCSWDataResidue
00050     # bCSWStatus
00051     FMT_CSW = "<IIIB"
00052 
00053     CSW_STATUS_PASSED = 0
00054     CSW_STATUS_FAILED = 1
00055     CSW_STATUS_PHASE_ERROR = 2
00056 
00057     class SCSIError(Exception):
00058 
00059         def __init__(self, error):
00060             Exception.__init__(self)
00061             self.value = error
00062 
00063     # Some SCSI commands
00064     # Value   Keil middleware define         Seagate name
00065     # 0x12    SCSI_INQUIRY                   INQUIRY
00066     # 0x23    SCSI_READ_FORMAT_CAPACITIES    Missing
00067     # 0x25    SCSI_READ_CAPACITY             READ CAPACITY (10)
00068     # 0x28    SCSI_READ10                    READ (10)
00069     # 0x1A    SCSI_MODE_SENSE6               MODE SENSE (6)
00070     # 0x00    SCSI_TEST_UNIT_READY           TEST UNIT READY
00071     # 0x2A    SCSI_WRITE10                   WRITE (10)
00072     # 0x03    SCSI_REQUEST_SENSE             REQUEST SENSE
00073     # 0x1E    SCSI_MEDIA_REMOVAL             Missing
00074 
00075     def __init__(self, device):
00076         self._dev  = device
00077         self._if  = None
00078         self.ep_in  = None
00079         self.ep_out  = None
00080         self._locked  = False
00081         self._cbw_tag  = 0
00082         self.timeout  = 60 * 1000
00083         # delays are for testing only
00084         self.delay_cbw_to_data  = 0
00085         self.delay_data_to_csw  = 0
00086 
00087         # Find interface
00088         for interface in device.get_active_configuration():
00089             if interface.bInterfaceClass == USBMsd.CLASS_MSD:
00090                 assert self._if  is None
00091                 self._if  = interface
00092         assert self._if  is not None
00093 
00094         # Find endpoints
00095         for endpoint in self._if :
00096             if endpoint.bEndpointAddress & 0x80:
00097                 assert self.ep_in  is None
00098                 self.ep_in  = endpoint
00099             else:
00100                 assert self.ep_out  is None
00101                 self.ep_out  = endpoint
00102         assert self.ep_in  is not None
00103         assert self.ep_out  is not None
00104 
00105     def lock (self):
00106         """Acquire exclisive access to MSD"""
00107         assert not self._locked 
00108 
00109         num = self._if .bInterfaceNumber
00110         try:
00111             if self._dev .is_kernel_driver_active(num):
00112                 self._dev .detach_kernel_driver(num)
00113         except NotImplementedError:
00114             pass
00115         except usb.core.USBError:
00116             pass
00117         usb.util.claim_interface(self._dev , num)
00118         self._locked  = True
00119 
00120     def unlock (self):
00121         """Release exclusive access to MSD"""
00122         assert self._locked 
00123 
00124         num = self._if .bInterfaceNumber
00125         usb.util.release_interface(self._dev , num)
00126         try:
00127             self._dev .attach_kernel_driver(num)
00128         except NotImplementedError:
00129             pass
00130         except usb.core.USBError:
00131             pass
00132         self._locked  = False
00133 
00134     def scsi_read10 (self, lba, block_count):
00135         """Send the SCSI read 10 command and return the data read"""
00136         block_size = 512
00137 
00138         cbwcb = bytearray(10)
00139         cbwcb[0] = 0x28
00140         cbwcb[2] = (lba >> (8 * 3)) & 0xFF
00141         cbwcb[3] = (lba >> (8 * 2)) & 0xFF
00142         cbwcb[4] = (lba >> (8 * 1)) & 0xFF
00143         cbwcb[5] = (lba >> (8 * 0)) & 0xFF
00144         cbwcb[7] = (block_count >> (8 * 1)) & 0xFF
00145         cbwcb[8] = (block_count >> (8 * 0)) & 0xFF
00146         ret, data = self._msd_transfer (cbwcb, 0, block_count * block_size)
00147         if ret != self.CSW_STATUS_PASSED :
00148             raise self.SCSIError (ret)
00149         return data
00150 
00151     def scsi_write10 (self, lba, data):
00152         """Send the SCSI write 10 command"""
00153         block_size = 512
00154 
00155         assert len(data) % block_size == 0
00156         block_count = (len(data) + (block_size - 1)) // block_size
00157 
00158         cbwcb = bytearray(10)
00159         cbwcb[0] = 0x2A
00160         cbwcb[2] = (lba >> (8 * 3)) & 0xFF
00161         cbwcb[3] = (lba >> (8 * 2)) & 0xFF
00162         cbwcb[4] = (lba >> (8 * 1)) & 0xFF
00163         cbwcb[5] = (lba >> (8 * 0)) & 0xFF
00164         cbwcb[7] = (block_count >> (8 * 1)) & 0xFF
00165         cbwcb[8] = (block_count >> (8 * 0)) & 0xFF
00166         ret, _ = self._msd_transfer (cbwcb, 0, data)
00167         if ret != self.CSW_STATUS_PASSED :
00168             raise self.SCSIError (ret)
00169 
00170     def scsi_test_unit_ready (self):
00171         """Send the SCSI test unit ready command and return status"""
00172         cbwcb = bytearray(10)
00173         cbwcb[0] = 0
00174         ret, _ = self._msd_transfer (cbwcb, 0)
00175         return ret
00176 
00177     def _msd_transfer(self, cbwcb, lun, size_or_data=None):
00178         """Perform a bulk only transfer"""
00179         assert self._locked 
00180         assert 1 <= len(cbwcb) <= 16
00181 
00182         # Increment packet tag
00183         transfer_tag = self._cbw_tag 
00184         self._cbw_tag  = (self._cbw_tag  + 1) & 0xFFFFFFFF
00185 
00186         # None means data size of zero
00187         if size_or_data is None:
00188             size_or_data = 0
00189 
00190         in_transfer = isinstance(size_or_data, numbers.Number)
00191         transfer_size = (size_or_data if in_transfer else len(size_or_data))
00192         assert in_transfer or len(size_or_data) > 0
00193 
00194         # Phase - Command transport
00195         cbw_signature = 0x43425355
00196         cbw_tag = transfer_tag
00197         cbw_data_transfer_length = transfer_size
00198         cbw_flags = (1 << 7) if in_transfer else 0
00199         cbw_lun = lun
00200         cbw_length = len(cbwcb)
00201         params = [cbw_signature, cbw_tag, cbw_data_transfer_length,
00202                   cbw_flags, cbw_lun, cbw_length]
00203         cbw = struct.pack(self.FMT_CBW , *params)
00204         pad_size = 16 - len(cbwcb)
00205         payload = cbw + cbwcb + bytearray(pad_size)
00206         self.ep_out .write(payload)
00207 
00208         if self.delay_cbw_to_data  != 0:
00209             time.sleep(self.delay_cbw_to_data )
00210 
00211         # Phase - Data Out or Data In (Optional)
00212         data = None
00213         if transfer_size > 0:
00214             endpoint = self.ep_in  if in_transfer else self.ep_out 
00215             try:
00216                 if in_transfer:
00217                     data = self.ep_in .read(transfer_size, self.timeout )
00218                 else:
00219                     self.ep_out .write(size_or_data, self.timeout )
00220             except usb.core.USBError:
00221                 endpoint.clear_halt()
00222 
00223         if self.delay_data_to_csw  != 0:
00224             time.sleep(self.delay_data_to_csw )
00225 
00226         # Phase - Status Transport
00227         csw = self.ep_in .read(13, self.timeout )
00228         csw_signature, csw_tag, csw_data_residue, csw_status = \
00229             struct.unpack(self.FMT_CSW , csw)
00230         assert csw_signature == 0x53425355
00231         assert csw_tag == transfer_tag
00232         #TODO - check residue
00233         return (csw_status, data)
00234 
00235 
00236 class Struct (object):
00237     """Base class for a C structure"""
00238 
00239     def __init__(self, name, structure, data):
00240         field_list = [field[0] for field in structure]
00241         fmt_list = [field[1] for field in structure]
00242         format_str = "<" + "".join(fmt_list)
00243         struct_size = struct.calcsize(format_str)
00244         value_list = struct.unpack(format_str, data[:struct_size])
00245         value_dict = {}
00246         for name, value in zip(field_list, value_list):
00247             value_dict[name] = value
00248         self.name  = name
00249         self.format_str  = format_str
00250         self.field_list  = field_list
00251         self.value_dict  = value_dict
00252         self.size  = struct_size
00253 
00254     def __getitem__(self, key):
00255         return self.value_dict [key]
00256 
00257     def __setitem__(self, key, value):
00258         self.value_dict [key] = value
00259 
00260     def __str__(self):
00261         desc = ""
00262         desc += self.name  + ":" + os.linesep
00263         for field in self.field_list :
00264             value = self.value_dict [field]
00265             if isinstance(value, bytes):
00266                 value = list(bytearray(value))
00267             desc += ("    %s=%s" + os.linesep) % (field, value)
00268         return desc
00269 
00270     def pack (self):
00271         """Return a byte representation of this structure"""
00272         value_list = []
00273         for field in self.field_list :
00274             value_list.append(self.value_dict [field])
00275         return struct.pack(self.format_str , *value_list)
00276 
00277 
00278 class MBR (Struct ):
00279     """Wrapper class for a FAT MBR"""
00280 
00281     STRUCTURE = (
00282         ("BS_jmpBoot", "3s"),
00283         ("BS_OEMName", "8s"),
00284         ("BPB_BytsPerSec", "H"),
00285         ("BPB_SecPerClus", "B"),
00286         ("BPB_RsvdSecCnt", "H"),
00287         ("BPB_NumFATs", "B"),
00288         ("BPB_RootEntCnt", "H"),
00289         ("BPB_TotSec16", "H"),
00290         ("BPB_Media", "B"),
00291         ("BPB_FATSz16", "H"),
00292         ("BPB_SecPerTrk", "H"),
00293         ("BPB_NumHeads", "H"),
00294         ("BPB_HiddSec", "L"),
00295         ("BPB_TotSec32", "L"),
00296         )
00297 
00298     def __init__(self, data, sector=None):
00299         Struct.__init__(self, "MBR", self.STRUCTURE , data)
00300         self.sector  = sector
00301 
00302 
00303 class DirectoryEntry (Struct ):
00304     """Wrapper class for a FAT DirectoryEntry"""
00305 
00306     STRUCTURE = (
00307         ("DIR_Name", "11s"),
00308         ("DIR_Attr", "B"),
00309         ("DIR_NTRes", "B"),
00310         ("DIR_CrtTimeTenth", "B"),
00311         ("DIR_CrtTime", "H"),
00312         ("DIR_CrtDate", "H"),
00313         ("DIR_LstAccDate", "H"),
00314         ("DIR_FstClusHI", "H"),
00315         ("DIR_WrtTime", "H"),
00316         ("DIR_WrtDate", "H"),
00317         ("DIR_FstClusLO", "H"),
00318         ("DIR_FileSize", "L"),
00319         )
00320 
00321     def __init__(self, data):
00322         Struct.__init__(self, "DirectoryEntry", self.STRUCTURE , data)
00323 
00324 
00325 class Directory (object):
00326     """Wrapper class for a FAT Directory"""
00327 
00328     ENTRY_SIZE = 32
00329 
00330     def __init__(self, entry_count, data, sector=None):
00331         directory_list = []
00332         for i in range(entry_count):
00333             start = i * self.ENTRY_SIZE 
00334             dir_data = data[start:start + self.ENTRY_SIZE ]
00335             entry = DirectoryEntry(dir_data)
00336             directory_list.append(entry)
00337         self.directory_list  = directory_list
00338         self.sector  = sector
00339 
00340     def __iter__(self):
00341         return iter(self.directory_list )
00342 
00343     def __getitem__(self, key):
00344         return self.directory_list [key]
00345 
00346     def find_free_entry_index (self):
00347         """Find a free index in this Directory or return None"""
00348         for idx, directory in enumerate(self.directory_list ):
00349             name_data = bytearray(directory["DIR_Name"])
00350             if name_data[0] in (0x00, 0xE5):
00351                 return idx
00352         return None
00353 
00354     def pack (self):
00355         """Return a byte a Directory"""
00356         data = bytearray()
00357         for directory in self.directory_list :
00358             data.extend(directory.pack())
00359         return data
00360 
00361 
00362 class Fat (object):
00363     """Wrapper class for a FAT filesystem on a SCSI device"""
00364 
00365     SECTOR_SIZE = 512
00366     CLUSTER_SIZE = 4 * 1024
00367 
00368     def __init__(self, msd):
00369         self.msd  = msd
00370         self.reload ()
00371 
00372     def reload (self):
00373         """Reload all internal data of this Fat filesystem"""
00374 
00375         # Read MBR
00376         mbr_data = self.msd .scsi_read10(0, 1)
00377         mbr = MBR(mbr_data, 0)
00378 
00379         # Read in the root directory
00380         root_dir_sec = (mbr["BPB_RsvdSecCnt"] +
00381                         (mbr["BPB_NumFATs"] * mbr["BPB_FATSz16"]))
00382         sec_count = (mbr["BPB_RootEntCnt"] * 32 + 512 - 1) // 512
00383         root_dir_data = self.msd .scsi_read10(root_dir_sec, sec_count)
00384         root_dir = Directory(mbr["BPB_RootEntCnt"], root_dir_data,
00385                              root_dir_sec)
00386         self.mbr  = mbr
00387         self.root_dir  = root_dir