Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
usb_msd.py
00001 # 00002 # DAPLink Interface Firmware 00003 # Copyright (c) 2016-2016, ARM Limited, All Rights Reserved 00004 # SPDX-License-Identifier: Apache-2.0 00005 # 00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may 00007 # not use this file except in compliance with the License. 00008 # You may obtain a copy of the License at 00009 # 00010 # http://www.apache.org/licenses/LICENSE-2.0 00011 # 00012 # Unless required by applicable law or agreed to in writing, software 00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 # See the License for the specific language governing permissions and 00016 # limitations under the License. 00017 # 00018 00019 import os 00020 import struct 00021 import numbers 00022 import time 00023 import usb.util 00024 00025 00026 class USBMsd (object): 00027 """Wrapper class for a MSD usb device""" 00028 00029 # Bulk only transport documented in 00030 # "Universal Serial Bus Mass Storage Class" 00031 # SCSI commands documented in "SCSI Commands Reference Manual" by Seagate 00032 00033 CLASS_MSD = 0x8 00034 # Write 10 00035 # Read 10 00036 # Test unit ready 00037 # Request Sense 00038 00039 # dCBWSignature 00040 # dCBWTag 00041 # dCBWDataTransferLength 00042 # bmCBWFlags 00043 # bCBWLUN 00044 # bCBWCBLength 00045 FMT_CBW = "<IIIBBB" 00046 00047 # dCSWSignature 00048 # dCSWTag 00049 # dCSWDataResidue 00050 # bCSWStatus 00051 FMT_CSW = "<IIIB" 00052 00053 CSW_STATUS_PASSED = 0 00054 CSW_STATUS_FAILED = 1 00055 CSW_STATUS_PHASE_ERROR = 2 00056 00057 class SCSIError(Exception): 00058 00059 def __init__(self, error): 00060 Exception.__init__(self) 00061 self.value = error 00062 00063 # Some SCSI commands 00064 # Value Keil middleware define Seagate name 00065 # 0x12 SCSI_INQUIRY INQUIRY 00066 # 0x23 SCSI_READ_FORMAT_CAPACITIES Missing 00067 # 0x25 SCSI_READ_CAPACITY READ CAPACITY (10) 00068 # 0x28 SCSI_READ10 READ (10) 00069 # 0x1A SCSI_MODE_SENSE6 MODE SENSE (6) 00070 # 0x00 SCSI_TEST_UNIT_READY TEST UNIT READY 00071 # 0x2A SCSI_WRITE10 WRITE (10) 00072 # 0x03 SCSI_REQUEST_SENSE REQUEST SENSE 00073 # 0x1E SCSI_MEDIA_REMOVAL Missing 00074 00075 def __init__(self, device): 00076 self._dev = device 00077 self._if = None 00078 self.ep_in = None 00079 self.ep_out = None 00080 self._locked = False 00081 self._cbw_tag = 0 00082 self.timeout = 60 * 1000 00083 # delays are for testing only 00084 self.delay_cbw_to_data = 0 00085 self.delay_data_to_csw = 0 00086 00087 # Find interface 00088 for interface in device.get_active_configuration(): 00089 if interface.bInterfaceClass == USBMsd.CLASS_MSD: 00090 assert self._if is None 00091 self._if = interface 00092 assert self._if is not None 00093 00094 # Find endpoints 00095 for endpoint in self._if : 00096 if endpoint.bEndpointAddress & 0x80: 00097 assert self.ep_in is None 00098 self.ep_in = endpoint 00099 else: 00100 assert self.ep_out is None 00101 self.ep_out = endpoint 00102 assert self.ep_in is not None 00103 assert self.ep_out is not None 00104 00105 def lock (self): 00106 """Acquire exclisive access to MSD""" 00107 assert not self._locked 00108 00109 num = self._if .bInterfaceNumber 00110 try: 00111 if self._dev .is_kernel_driver_active(num): 00112 self._dev .detach_kernel_driver(num) 00113 except NotImplementedError: 00114 pass 00115 except usb.core.USBError: 00116 pass 00117 usb.util.claim_interface(self._dev , num) 00118 self._locked = True 00119 00120 def unlock (self): 00121 """Release exclusive access to MSD""" 00122 assert self._locked 00123 00124 num = self._if .bInterfaceNumber 00125 usb.util.release_interface(self._dev , num) 00126 try: 00127 self._dev .attach_kernel_driver(num) 00128 except NotImplementedError: 00129 pass 00130 except usb.core.USBError: 00131 pass 00132 self._locked = False 00133 00134 def scsi_read10 (self, lba, block_count): 00135 """Send the SCSI read 10 command and return the data read""" 00136 block_size = 512 00137 00138 cbwcb = bytearray(10) 00139 cbwcb[0] = 0x28 00140 cbwcb[2] = (lba >> (8 * 3)) & 0xFF 00141 cbwcb[3] = (lba >> (8 * 2)) & 0xFF 00142 cbwcb[4] = (lba >> (8 * 1)) & 0xFF 00143 cbwcb[5] = (lba >> (8 * 0)) & 0xFF 00144 cbwcb[7] = (block_count >> (8 * 1)) & 0xFF 00145 cbwcb[8] = (block_count >> (8 * 0)) & 0xFF 00146 ret, data = self._msd_transfer (cbwcb, 0, block_count * block_size) 00147 if ret != self.CSW_STATUS_PASSED : 00148 raise self.SCSIError (ret) 00149 return data 00150 00151 def scsi_write10 (self, lba, data): 00152 """Send the SCSI write 10 command""" 00153 block_size = 512 00154 00155 assert len(data) % block_size == 0 00156 block_count = (len(data) + (block_size - 1)) // block_size 00157 00158 cbwcb = bytearray(10) 00159 cbwcb[0] = 0x2A 00160 cbwcb[2] = (lba >> (8 * 3)) & 0xFF 00161 cbwcb[3] = (lba >> (8 * 2)) & 0xFF 00162 cbwcb[4] = (lba >> (8 * 1)) & 0xFF 00163 cbwcb[5] = (lba >> (8 * 0)) & 0xFF 00164 cbwcb[7] = (block_count >> (8 * 1)) & 0xFF 00165 cbwcb[8] = (block_count >> (8 * 0)) & 0xFF 00166 ret, _ = self._msd_transfer (cbwcb, 0, data) 00167 if ret != self.CSW_STATUS_PASSED : 00168 raise self.SCSIError (ret) 00169 00170 def scsi_test_unit_ready (self): 00171 """Send the SCSI test unit ready command and return status""" 00172 cbwcb = bytearray(10) 00173 cbwcb[0] = 0 00174 ret, _ = self._msd_transfer (cbwcb, 0) 00175 return ret 00176 00177 def _msd_transfer(self, cbwcb, lun, size_or_data=None): 00178 """Perform a bulk only transfer""" 00179 assert self._locked 00180 assert 1 <= len(cbwcb) <= 16 00181 00182 # Increment packet tag 00183 transfer_tag = self._cbw_tag 00184 self._cbw_tag = (self._cbw_tag + 1) & 0xFFFFFFFF 00185 00186 # None means data size of zero 00187 if size_or_data is None: 00188 size_or_data = 0 00189 00190 in_transfer = isinstance(size_or_data, numbers.Number) 00191 transfer_size = (size_or_data if in_transfer else len(size_or_data)) 00192 assert in_transfer or len(size_or_data) > 0 00193 00194 # Phase - Command transport 00195 cbw_signature = 0x43425355 00196 cbw_tag = transfer_tag 00197 cbw_data_transfer_length = transfer_size 00198 cbw_flags = (1 << 7) if in_transfer else 0 00199 cbw_lun = lun 00200 cbw_length = len(cbwcb) 00201 params = [cbw_signature, cbw_tag, cbw_data_transfer_length, 00202 cbw_flags, cbw_lun, cbw_length] 00203 cbw = struct.pack(self.FMT_CBW , *params) 00204 pad_size = 16 - len(cbwcb) 00205 payload = cbw + cbwcb + bytearray(pad_size) 00206 self.ep_out .write(payload) 00207 00208 if self.delay_cbw_to_data != 0: 00209 time.sleep(self.delay_cbw_to_data ) 00210 00211 # Phase - Data Out or Data In (Optional) 00212 data = None 00213 if transfer_size > 0: 00214 endpoint = self.ep_in if in_transfer else self.ep_out 00215 try: 00216 if in_transfer: 00217 data = self.ep_in .read(transfer_size, self.timeout ) 00218 else: 00219 self.ep_out .write(size_or_data, self.timeout ) 00220 except usb.core.USBError: 00221 endpoint.clear_halt() 00222 00223 if self.delay_data_to_csw != 0: 00224 time.sleep(self.delay_data_to_csw ) 00225 00226 # Phase - Status Transport 00227 csw = self.ep_in .read(13, self.timeout ) 00228 csw_signature, csw_tag, csw_data_residue, csw_status = \ 00229 struct.unpack(self.FMT_CSW , csw) 00230 assert csw_signature == 0x53425355 00231 assert csw_tag == transfer_tag 00232 #TODO - check residue 00233 return (csw_status, data) 00234 00235 00236 class Struct (object): 00237 """Base class for a C structure""" 00238 00239 def __init__(self, name, structure, data): 00240 field_list = [field[0] for field in structure] 00241 fmt_list = [field[1] for field in structure] 00242 format_str = "<" + "".join(fmt_list) 00243 struct_size = struct.calcsize(format_str) 00244 value_list = struct.unpack(format_str, data[:struct_size]) 00245 value_dict = {} 00246 for name, value in zip(field_list, value_list): 00247 value_dict[name] = value 00248 self.name = name 00249 self.format_str = format_str 00250 self.field_list = field_list 00251 self.value_dict = value_dict 00252 self.size = struct_size 00253 00254 def __getitem__(self, key): 00255 return self.value_dict [key] 00256 00257 def __setitem__(self, key, value): 00258 self.value_dict [key] = value 00259 00260 def __str__(self): 00261 desc = "" 00262 desc += self.name + ":" + os.linesep 00263 for field in self.field_list : 00264 value = self.value_dict [field] 00265 if isinstance(value, bytes): 00266 value = list(bytearray(value)) 00267 desc += (" %s=%s" + os.linesep) % (field, value) 00268 return desc 00269 00270 def pack (self): 00271 """Return a byte representation of this structure""" 00272 value_list = [] 00273 for field in self.field_list : 00274 value_list.append(self.value_dict [field]) 00275 return struct.pack(self.format_str , *value_list) 00276 00277 00278 class MBR (Struct ): 00279 """Wrapper class for a FAT MBR""" 00280 00281 STRUCTURE = ( 00282 ("BS_jmpBoot", "3s"), 00283 ("BS_OEMName", "8s"), 00284 ("BPB_BytsPerSec", "H"), 00285 ("BPB_SecPerClus", "B"), 00286 ("BPB_RsvdSecCnt", "H"), 00287 ("BPB_NumFATs", "B"), 00288 ("BPB_RootEntCnt", "H"), 00289 ("BPB_TotSec16", "H"), 00290 ("BPB_Media", "B"), 00291 ("BPB_FATSz16", "H"), 00292 ("BPB_SecPerTrk", "H"), 00293 ("BPB_NumHeads", "H"), 00294 ("BPB_HiddSec", "L"), 00295 ("BPB_TotSec32", "L"), 00296 ) 00297 00298 def __init__(self, data, sector=None): 00299 Struct.__init__(self, "MBR", self.STRUCTURE , data) 00300 self.sector = sector 00301 00302 00303 class DirectoryEntry (Struct ): 00304 """Wrapper class for a FAT DirectoryEntry""" 00305 00306 STRUCTURE = ( 00307 ("DIR_Name", "11s"), 00308 ("DIR_Attr", "B"), 00309 ("DIR_NTRes", "B"), 00310 ("DIR_CrtTimeTenth", "B"), 00311 ("DIR_CrtTime", "H"), 00312 ("DIR_CrtDate", "H"), 00313 ("DIR_LstAccDate", "H"), 00314 ("DIR_FstClusHI", "H"), 00315 ("DIR_WrtTime", "H"), 00316 ("DIR_WrtDate", "H"), 00317 ("DIR_FstClusLO", "H"), 00318 ("DIR_FileSize", "L"), 00319 ) 00320 00321 def __init__(self, data): 00322 Struct.__init__(self, "DirectoryEntry", self.STRUCTURE , data) 00323 00324 00325 class Directory (object): 00326 """Wrapper class for a FAT Directory""" 00327 00328 ENTRY_SIZE = 32 00329 00330 def __init__(self, entry_count, data, sector=None): 00331 directory_list = [] 00332 for i in range(entry_count): 00333 start = i * self.ENTRY_SIZE 00334 dir_data = data[start:start + self.ENTRY_SIZE ] 00335 entry = DirectoryEntry(dir_data) 00336 directory_list.append(entry) 00337 self.directory_list = directory_list 00338 self.sector = sector 00339 00340 def __iter__(self): 00341 return iter(self.directory_list ) 00342 00343 def __getitem__(self, key): 00344 return self.directory_list [key] 00345 00346 def find_free_entry_index (self): 00347 """Find a free index in this Directory or return None""" 00348 for idx, directory in enumerate(self.directory_list ): 00349 name_data = bytearray(directory["DIR_Name"]) 00350 if name_data[0] in (0x00, 0xE5): 00351 return idx 00352 return None 00353 00354 def pack (self): 00355 """Return a byte a Directory""" 00356 data = bytearray() 00357 for directory in self.directory_list : 00358 data.extend(directory.pack()) 00359 return data 00360 00361 00362 class Fat (object): 00363 """Wrapper class for a FAT filesystem on a SCSI device""" 00364 00365 SECTOR_SIZE = 512 00366 CLUSTER_SIZE = 4 * 1024 00367 00368 def __init__(self, msd): 00369 self.msd = msd 00370 self.reload () 00371 00372 def reload (self): 00373 """Reload all internal data of this Fat filesystem""" 00374 00375 # Read MBR 00376 mbr_data = self.msd .scsi_read10(0, 1) 00377 mbr = MBR(mbr_data, 0) 00378 00379 # Read in the root directory 00380 root_dir_sec = (mbr["BPB_RsvdSecCnt"] + 00381 (mbr["BPB_NumFATs"] * mbr["BPB_FATSz16"])) 00382 sec_count = (mbr["BPB_RootEntCnt"] * 32 + 512 - 1) // 512 00383 root_dir_data = self.msd .scsi_read10(root_dir_sec, sec_count) 00384 root_dir = Directory(mbr["BPB_RootEntCnt"], root_dir_data, 00385 root_dir_sec) 00386 self.mbr = mbr 00387 self.root_dir = root_dir
Generated on Tue Jul 12 2022 15:37:26 by
 1.7.2
 1.7.2