Optimize fat_tool.py with batch allocation and streaming copy

This commit is contained in:
2026-04-18 01:09:13 +01:00
parent 99a0538f1e
commit 1a1a606793
+32 -16
View File
@@ -150,18 +150,33 @@ class Fat32:
write_le32(self.fat, cluster * 4, value & 0x0FFFFFFF) write_le32(self.fat, cluster * 4, value & 0x0FFFFFFF)
def _alloc_cluster(self): def _alloc_cluster(self):
"""Allocate a single free cluster. Caller must flush FAT."""
fat_entries = len(self.fat) // 4 fat_entries = len(self.fat) // 4
limit = min(self.max_cluster + 1, fat_entries) limit = min(self.max_cluster + 1, fat_entries)
for i in range(2, limit): for i in range(2, limit):
if read_le32(self.fat, i * 4) == 0: if read_le32(self.fat, i * 4) == 0:
self._set_fat(i, END_OF_CHAIN) self._set_fat(i, END_OF_CHAIN)
self._flush_fat()
self.f.seek(self._cluster_offset(i))
self.f.write(b"\x00" * self.cluster_size)
self.f.flush()
return i return i
raise RuntimeError("FAT32: no free clusters") raise RuntimeError("FAT32: no free clusters")
def _alloc_clusters(self, count):
"""Batch-allocate `count` free clusters in a single FAT scan."""
fat_entries = len(self.fat) // 4
limit = min(self.max_cluster + 1, fat_entries)
clusters = []
for i in range(2, limit):
if read_le32(self.fat, i * 4) == 0:
self._set_fat(i, END_OF_CHAIN)
clusters.append(i)
if len(clusters) == count:
break
if len(clusters) < count:
# Roll back partial allocations
for c in clusters:
self._set_fat(c, 0)
raise RuntimeError(f"FAT32: only {len(clusters)}/{count} free clusters available")
return clusters
def _cluster_chain(self, start): def _cluster_chain(self, start):
if start < 2: if start < 2:
return [] return []
@@ -529,6 +544,7 @@ class Fat32:
continue continue
new_cluster = self._alloc_cluster() new_cluster = self._alloc_cluster()
self._flush_fat()
try: try:
self._initialize_directory(new_cluster, current_cluster) self._initialize_directory(new_cluster, current_cluster)
self._add_dir_entry(current_cluster, part, new_cluster, True) self._add_dir_entry(current_cluster, part, new_cluster, True)
@@ -538,8 +554,7 @@ class Fat32:
current_cluster = new_cluster current_cluster = new_cluster
def cp_in(self, host_path, fat_path): def cp_in(self, host_path, fat_path):
with open(host_path, "rb") as host_file: file_size = os.path.getsize(host_path)
data = host_file.read()
parts = [part for part in fat_path.replace("\\", "/").split("/") if part] parts = [part for part in fat_path.replace("\\", "/").split("/") if part]
if not parts: if not parts:
@@ -563,35 +578,36 @@ class Fat32:
if existing["is_dir"]: if existing["is_dir"]:
raise RuntimeError(f"cp-in: '{file_name}' is a directory") raise RuntimeError(f"cp-in: '{file_name}' is a directory")
cluster_count = (len(data) + self.cluster_size - 1) // self.cluster_size cluster_count = (file_size + self.cluster_size - 1) // self.cluster_size
clusters = [] clusters = []
try: try:
for _ in range(cluster_count): if cluster_count > 0:
clusters.append(self._alloc_cluster()) clusters = self._alloc_clusters(cluster_count)
if clusters:
for i in range(len(clusters) - 1): for i in range(len(clusters) - 1):
self._set_fat(clusters[i], clusters[i + 1]) self._set_fat(clusters[i], clusters[i + 1])
self._set_fat(clusters[-1], END_OF_CHAIN) self._set_fat(clusters[-1], END_OF_CHAIN)
self._flush_fat() self._flush_fat()
zero_buf = bytearray(self.cluster_size)
with open(host_path, "rb") as host_file:
for index, cluster in enumerate(clusters): for index, cluster in enumerate(clusters):
chunk = data[index * self.cluster_size : (index + 1) * self.cluster_size] chunk = host_file.read(self.cluster_size)
buffer = bytearray(self.cluster_size) buf = zero_buf if len(chunk) == self.cluster_size else bytearray(self.cluster_size)
buffer[: len(chunk)] = chunk buf[:len(chunk)] = chunk
self._write_cluster(cluster, buffer) self._write_cluster(cluster, buf)
first_cluster = clusters[0] if clusters else 0 first_cluster = clusters[0] if clusters else 0
if existing is not None: if existing is not None:
replacement = self._build_short_entry( replacement = self._build_short_entry(
existing["short_bytes"], first_cluster, False, len(data) existing["short_bytes"], first_cluster, False, file_size
) )
self.f.seek(existing["entry_offset"]) self.f.seek(existing["entry_offset"])
self.f.write(replacement) self.f.write(replacement)
if existing["first_cluster"] >= 2: if existing["first_cluster"] >= 2:
self._free_cluster_chain(existing["first_cluster"]) self._free_cluster_chain(existing["first_cluster"])
else: else:
self._add_dir_entry(parent_cluster, file_name, first_cluster, False, len(data)) self._add_dir_entry(parent_cluster, file_name, first_cluster, False, file_size)
self.f.flush() self.f.flush()
os.fsync(self.f.fileno()) os.fsync(self.f.fileno())
except Exception: except Exception: