-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfatdefrag.py
More file actions
1125 lines (991 loc) · 44.9 KB
/
fatdefrag.py
File metadata and controls
1125 lines (991 loc) · 44.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
fatdefrag.py — a pure-Python FAT16/FAT32 defragmenter for *disk images* (no external disk utilities).
⚠️ WARNING
- This operates directly on a disk image file. Back up your image first.
- Supports MBR (including EBR chains). GPT/protective MBR is not supported.
- Targets FAT16 and FAT32 only (not FAT12/exFAT).
- Conservative strategy: compacts files toward the start of the data area using
only free space; skips files when no suitable contiguous run exists.
New in this complete version
----------------------------
- **Progress bars** (overall and per-file) without extra dependencies (no tqdm required).
- **Stricter integrity checker**: compares FAT copies, detects cross-links, orphans,
truncated chains, and FSInfo inconsistencies, plus a fragmentation summary.
- **Safer workflow**: strict pre-check runs by default when `--check` is used; you can
`--force` to continue despite errors, and the tool re-checks after defrag.
Usage examples
--------------
List partitions in a disk image (and exit):
python fatdefrag.py /path/to/disk.img --list
Start, list partitions, choose interactively, and defrag in place:
python fatdefrag.py /path/to/disk.img --inplace --check
Non-interactive, defrag partition index N:
python fatdefrag.py /path/to/disk.img -p N --inplace --check
Dry-run to preview moves (no writes):
python fatdefrag.py /path/to/disk.img -p N --dry-run --verbose --check
Make a backup first:
python fatdefrag.py /path/to/disk.img -p N --backup disk.img.bak --inplace --check
"""
from __future__ import annotations
import argparse
import hashlib
import io
import os
import struct
import sys
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict, Iterable
# ------------------------- MBR / Partition parsing -------------------------
MBR_PART_OFFSET = 446
MBR_ENTRY_SIZE = 16
MBR_SIGNATURE_OFFSET = 510
MBR_SIGNATURE = b"\x55\xAA"
FAT_TYPES = {
0x04: "FAT16 (CHS)",
0x06: "FAT16 (LBA)",
0x0E: "FAT16 (LBA, CHS alt)",
0x0B: "FAT32 (CHS)",
0x0C: "FAT32 (LBA)",
}
EXT_TYPES = {0x05, 0x0F}
GPT_PROTECTIVE = 0xEE
@dataclass
class Part:
index: int
part_type: int
lba_start: int
sectors: int
is_extended: bool = False
description: str = ""
parent_ext_base: int = 0
def __str__(self) -> str:
kind = FAT_TYPES.get(self.part_type, f"type 0x{self.part_type:02X}")
size_mb = (self.sectors * 512) / (1024 * 1024)
ext = " (extended)" if self.is_extended else ""
return f"[{self.index}] LBA {self.lba_start:<10} size {size_mb:>8.1f} MiB {kind}{ext}"
# ------------------------- Low-level I/O helpers -------------------------
def read_at(f: io.BufferedRandom, offset: int, size: int) -> bytes:
f.seek(offset)
data = f.read(size)
if len(data) != size:
raise IOError("Short read")
return data
def write_at(f: io.BufferedRandom, offset: int, data: bytes) -> None:
f.seek(offset)
w = f.write(data)
if w != len(data):
raise IOError("Short write")
# ------------------------- Progress utilities -------------------------
class Progress:
def __init__(self, total: int, prefix: str = "") -> None:
self.total = max(int(total), 0)
self.current = 0
self.prefix = prefix
self._width = 40
self._last_render = ""
def _render(self) -> str:
if self.total <= 0:
bar = "[" + ("-" * self._width) + "]"
return f"{self.prefix} {bar} 0% (0/0)"
ratio = self.current / self.total
ratio = 1.0 if ratio > 1 else ratio
filled = int(self._width * ratio)
bar = "[" + ("#" * filled) + ("-" * (self._width - filled)) + "]"
pct = int(ratio * 100)
return f"{self.prefix} {bar} {pct:3d}% ({self.current}/{self.total})"
def update(self, n: int = 1) -> None:
self.current += n
if self.current > self.total:
self.current = self.total
line = "\r" + self._render()
if line != self._last_render:
sys.stderr.write(line)
sys.stderr.flush()
self._last_render = line
def close(self) -> None:
sys.stderr.write("\r" + self._render() + "\n")
sys.stderr.flush()
# ------------------------- MBR / EBR parsing -------------------------
def parse_mbr_partitions(f: io.BufferedRandom) -> List[Part]:
mbr = read_at(f, 0, 512)
if mbr[MBR_SIGNATURE_OFFSET:MBR_SIGNATURE_OFFSET+2] != MBR_SIGNATURE:
raise ValueError("Invalid MBR signature; GPT or raw volume not supported.")
parts: List[Part] = []
def parse_entry(raw: bytes, idx: int) -> Part:
ptype = raw[4]
lba_start = struct.unpack_from("<I", raw, 8)[0]
sectors = struct.unpack_from("<I", raw, 12)[0]
return Part(index=idx, part_type=ptype, lba_start=lba_start, sectors=sectors,
is_extended=(ptype in EXT_TYPES))
primaries: List[Part] = []
for i in range(4):
entry = mbr[MBR_PART_OFFSET + i*MBR_ENTRY_SIZE: MBR_PART_OFFSET + (i+1)*MBR_ENTRY_SIZE]
p = parse_entry(entry, i)
if p.sectors == 0:
continue
if p.part_type == GPT_PROTECTIVE:
raise ValueError("GPT/protective MBR detected; not supported.")
primaries.append(p)
parts.extend(primaries)
# Follow extended partitions (EBR chain)
for p in primaries:
if not p.is_extended:
continue
ext_base = p.lba_start
next_ebr_rel = 0
while True:
ebr_lba = ext_base + next_ebr_rel
ebr = read_at(f, ebr_lba*512, 512)
if ebr[MBR_SIGNATURE_OFFSET:MBR_SIGNATURE_OFFSET+2] != MBR_SIGNATURE:
break
e1 = ebr[MBR_PART_OFFSET:MBR_PART_OFFSET+MBR_ENTRY_SIZE]
lp = Part(index=len(parts), part_type=e1[4],
lba_start=ebr_lba + struct.unpack_from("<I", e1, 8)[0],
sectors=struct.unpack_from("<I", e1, 12)[0],
is_extended=False, parent_ext_base=ext_base)
if lp.sectors:
parts.append(lp)
# Link to next EBR
e2 = ebr[MBR_PART_OFFSET+MBR_ENTRY_SIZE:MBR_PART_OFFSET+2*MBR_ENTRY_SIZE]
next_rel = struct.unpack_from("<I", e2, 8)[0]
if next_rel == 0:
break
next_ebr_rel = next_rel
for p in parts:
p.description = FAT_TYPES.get(p.part_type, f"0x{p.part_type:02X}")
return parts
# ------------------------- FAT structures & helpers -------------------------
@dataclass
class BPB:
byts_per_sec: int
sec_per_clus: int
rsvd_sec_cnt: int
num_fats: int
root_ent_cnt: int
tot_sec_16: int
media: int
fatsz_16: int
sec_per_trk: int
num_heads: int
hidd_sec: int
tot_sec_32: int
fatsz_32: int = 0
ext_flags: int = 0
fsver: int = 0
root_clus: int = 0
fsinfo: int = 0
bkbootsec: int = 0
fat_type: str = ""
fat_bits: int = 0
@staticmethod
def parse(boot: bytes) -> 'BPB':
byts_per_sec = struct.unpack_from("<H", boot, 11)[0]
sec_per_clus = boot[13]
rsvd_sec_cnt = struct.unpack_from("<H", boot, 14)[0]
num_fats = boot[16]
root_ent_cnt = struct.unpack_from("<H", boot, 17)[0]
tot_sec_16 = struct.unpack_from("<H", boot, 19)[0]
media = boot[21]
fatsz_16 = struct.unpack_from("<H", boot, 22)[0]
sec_per_trk = struct.unpack_from("<H", boot, 24)[0]
num_heads = struct.unpack_from("<H", boot, 26)[0]
hidd_sec = struct.unpack_from("<I", boot, 28)[0]
tot_sec_32 = struct.unpack_from("<I", boot, 32)[0]
fatsz_32 = ext_flags = fsver = root_clus = fsinfo = bkbootsec = 0
if fatsz_16 == 0:
fatsz_32 = struct.unpack_from("<I", boot, 36)[0]
ext_flags = struct.unpack_from("<H", boot, 40)[0]
fsver = struct.unpack_from("<H", boot, 42)[0]
root_clus = struct.unpack_from("<I", boot, 44)[0]
fsinfo = struct.unpack_from("<H", boot, 48)[0]
bkbootsec = struct.unpack_from("<H", boot, 50)[0]
fatsz = fatsz_16 if fatsz_16 else fatsz_32
root_dir_sectors = ((root_ent_cnt * 32) + (byts_per_sec - 1)) // byts_per_sec
tot_sec = tot_sec_16 if tot_sec_16 else tot_sec_32
data_sectors = tot_sec - (rsvd_sec_cnt + num_fats * fatsz + root_dir_sectors)
count_of_clusters = data_sectors // sec_per_clus if sec_per_clus else 0
if count_of_clusters < 4085:
ftype, fbits = "FAT12", 12
elif count_of_clusters < 65525:
ftype, fbits = "FAT16", 16
else:
ftype, fbits = "FAT32", 32
return BPB(byts_per_sec, sec_per_clus, rsvd_sec_cnt, num_fats, root_ent_cnt,
tot_sec_16, media, fatsz_16, sec_per_trk, num_heads, hidd_sec,
tot_sec_32, fatsz_32, ext_flags, fsver, root_clus, fsinfo,
bkbootsec, ftype, fbits)
@dataclass
class DirEntry:
name: str
attr: int
start_cluster: int
size: int
is_dir: bool
raw_offset: int
def _fat_entry_cluster_field_offsets(fat_type: str) -> Tuple[int, ...]:
if fat_type == "FAT32":
return (20, 21, 26, 27)
return (26, 27)
ATTR_READ_ONLY = 0x01
ATTR_HIDDEN = 0x02
ATTR_SYSTEM = 0x04
ATTR_VOLUME_ID = 0x08
ATTR_DIRECTORY = 0x10
ATTR_ARCHIVE = 0x20
ATTR_LFN = 0x0F
EOC16 = 0xFFF8
EOC32 = 0x0FFFFFF8
FREE16 = 0x0000
FREE32 = 0x00000000
BAD16 = 0xFFF7
BAD32 = 0x0FFFFFF7
@dataclass
class FatVolume:
f: io.BufferedRandom
part: Part
bpb: BPB
byts_per_sec: int
sec_per_clus: int
first_data_sector: int
first_fat_sector: int
root_dir_first_sector: int
root_dir_sectors: int
fatsz: int
eoc: int
free_value: int
fat_bits: int
max_cluster: int
fat: List[int] = field(default_factory=list)
@staticmethod
def open(f: io.BufferedRandom, part: Part) -> 'FatVolume':
boot = read_at(f, part.lba_start * 512, 512)
if boot[510:512] != b"\x55\xAA":
raise ValueError("Invalid boot sector signature for FAT volume.")
bpb = BPB.parse(boot)
if bpb.fat_type not in ("FAT16", "FAT32"):
raise ValueError(f"Unsupported FAT type: {bpb.fat_type}")
byts_per_sec = bpb.byts_per_sec
sec_per_clus = bpb.sec_per_clus
fatsz = bpb.fatsz_16 if bpb.fatsz_16 else bpb.fatsz_32
first_fat_sector = part.lba_start + bpb.rsvd_sec_cnt
root_dir_sectors = ((bpb.root_ent_cnt * 32) + (byts_per_sec - 1)) // byts_per_sec
if bpb.fat_type == "FAT32":
root_dir_first_sector = 0
else:
root_dir_first_sector = first_fat_sector + bpb.num_fats * fatsz
first_data_sector = first_fat_sector + bpb.num_fats * fatsz + root_dir_sectors
tot_sec = bpb.tot_sec_16 if bpb.tot_sec_16 else bpb.tot_sec_32
data_sectors = tot_sec - (bpb.rsvd_sec_cnt + bpb.num_fats * fatsz + root_dir_sectors)
cluster_count = data_sectors // sec_per_clus if sec_per_clus else 0
max_cluster = cluster_count + 1
eoc = EOC32 if bpb.fat_type == "FAT32" else EOC16
free_value = FREE32 if bpb.fat_type == "FAT32" else FREE16
vol = FatVolume(
f=f, part=part, bpb=bpb, byts_per_sec=byts_per_sec, sec_per_clus=sec_per_clus,
first_data_sector=first_data_sector, first_fat_sector=first_fat_sector,
root_dir_first_sector=root_dir_first_sector, root_dir_sectors=root_dir_sectors,
fatsz=fatsz, eoc=eoc, free_value=free_value, fat_bits=bpb.fat_bits,
max_cluster=max_cluster,
)
vol.load_fat()
return vol
def first_sector_of_cluster(self, n: int) -> int:
if n < 2 or n > self.max_cluster:
raise ValueError(f"Cluster index out of data range: {n}")
return self.first_data_sector + (n - 2) * self.sec_per_clus
def cluster_offset_bytes(self, n: int) -> int:
return self.first_sector_of_cluster(n) * self.byts_per_sec
def load_fat(self) -> None:
fat_bytes = read_at(self.f, self.first_fat_sector * self.byts_per_sec, self.fatsz * self.byts_per_sec)
if self.bpb.fat_type == "FAT16":
count = len(fat_bytes) // 2
self.fat = list(struct.unpack_from(f"<{count}H", fat_bytes, 0))
else:
count = len(fat_bytes) // 4
vals = list(struct.unpack_from(f"<{count}I", fat_bytes, 0))
self.fat = [v & 0x0FFFFFFF for v in vals]
def flush_fat(self) -> None:
if self.bpb.fat_type == "FAT16":
out = struct.pack(f"<{len(self.fat)}H", *self.fat)
else:
vals = [v & 0x0FFFFFFF for v in self.fat]
out = struct.pack(f"<{len(vals)}I", *vals)
for i in range(self.bpb.num_fats):
off = (self.first_fat_sector + i * self.fatsz) * self.byts_per_sec
write_at(self.f, off, out)
self.f.flush()
os.fsync(self.f.fileno())
def next_cluster(self, n: int) -> Optional[int]:
if n < 2 or n > self.max_cluster:
raise ValueError(f"Cluster index out of range: {n}")
v = self.fat[n]
if self.bpb.fat_type == "FAT16":
if v >= EOC16: return None
if v == BAD16: raise ValueError("Encountered BAD cluster")
if v == FREE16: raise ValueError("Encountered FREE cluster in chain")
if v < 2 or v > self.max_cluster:
raise ValueError(f"Next cluster out of range: {v}")
return v
else:
if v >= EOC32: return None
if v == BAD32: raise ValueError("Encountered BAD cluster")
if v == FREE32: raise ValueError("Encountered FREE cluster in chain")
if v < 2 or v > self.max_cluster:
raise ValueError(f"Next cluster out of range: {v}")
return v
def cluster_chain(self, start: int) -> List[int]:
if start < 2:
return []
chain, seen = [], set()
n = start
while n is not None:
if n in seen:
raise ValueError("FAT loop detected")
seen.add(n)
chain.append(n)
n = self.next_cluster(n)
return chain
def cluster_read(self, n: int) -> bytes:
off = self.cluster_offset_bytes(n)
size = self.sec_per_clus * self.byts_per_sec
return read_at(self.f, off, size)
def cluster_write(self, n: int, data: bytes) -> None:
size = self.sec_per_clus * self.byts_per_sec
if len(data) != size:
raise ValueError("cluster_write: wrong data size")
off = self.cluster_offset_bytes(n)
write_at(self.f, off, data)
def iter_root_dir_sectors(self) -> Iterable[Tuple[int, bytes]]:
if self.bpb.fat_type == "FAT16":
first = self.root_dir_first_sector
for i in range(self.root_dir_sectors):
off = (first + i) * self.byts_per_sec
yield off, read_at(self.f, off, self.byts_per_sec)
else:
for off, sec in self.iter_dir_chain(self.bpb.root_clus):
yield off, sec
def iter_dir_chain(self, start_cluster: int) -> Iterable[Tuple[int, bytes]]:
size = self.byts_per_sec
for clus in self.cluster_chain(start_cluster):
first_sec = self.first_sector_of_cluster(clus)
for i in range(self.sec_per_clus):
off = (first_sec + i) * size
yield off, read_at(self.f, off, size)
def list_directory(self, start_cluster: Optional[int], base_offset: Optional[int] = None):
ATTR_LFN = 0x0F
ATTR_DIRECTORY = 0x10
ATTR_VOLUME_ID = 0x08
entries: list[DirEntry] = []
lfn_stack: list[bytes] = []
stop = False
def parse_sector(off: int, sec: bytes):
nonlocal entries, lfn_stack, stop
for i in range(0, len(sec), 32):
e = sec[i:i+32]
raw_off = off + i
name0 = e[0]
if name0 == 0x00:
# End-of-directory marker: no valid entries follow in later sectors either.
lfn_stack.clear()
stop = True
return
if name0 == 0xE5:
# deleted entry; discard any pending LFN
lfn_stack.clear()
continue
attr = e[11]
if attr == ATTR_LFN:
# long file name component
lfn_stack.append(e)
continue
is_dir = bool(attr & ATTR_DIRECTORY)
# Build display name
if lfn_stack:
name = self._lfn_to_name(reversed(lfn_stack))
lfn_stack.clear()
else:
base = e[0:8].decode("ascii", errors="replace").rstrip()
ext = e[8:11].decode("ascii", errors="replace").rstrip()
name = f"{base}.{ext}".rstrip(".")
name = name.strip().strip("/\\") # never carry trailing slashes
# Skip volume labels and dot entries
if attr & ATTR_VOLUME_ID:
continue
if is_dir and name in (".", ".."):
continue
start_lo = struct.unpack_from("<H", e, 26)[0]
start_hi = struct.unpack_from("<H", e, 20)[0] if self.bpb.fat_type == "FAT32" else 0
start = (start_hi << 16) | start_lo
size = struct.unpack_from("<I", e, 28)[0]
entries.append(DirEntry(
name=name,
attr=attr,
start_cluster=start,
size=size,
is_dir=is_dir,
raw_offset=raw_off
))
# Walk the directory sectors and parse each one
if self.bpb.fat_type == "FAT16" and (start_cluster is None or start_cluster < 2):
# FAT16 root directory lives in a fixed area
for off, sec in self.iter_root_dir_sectors():
parse_sector(off, sec)
if stop:
break
else:
# Subdirectory (FAT16) or any directory (FAT32) via cluster chain
if start_cluster is None or start_cluster < 2:
# Defensive: FAT32 root should have a valid cluster; if not, return empty
return entries
for off, sec in self.iter_dir_chain(start_cluster):
parse_sector(off, sec)
if stop:
break
return entries
@staticmethod
def _lfn_to_name(entries: Iterable[bytes]) -> str:
name_chars: List[str] = []
for e in entries:
parts = e[1:11] + e[14:26] + e[28:32]
for j in range(0, len(parts), 2):
c = parts[j:j+2]
if c in (b"\xFF\xFF", b"\x00\x00"):
continue
name_chars.append(c.decode('utf-16le', errors='ignore'))
return ''.join(name_chars).rstrip('\u0000').rstrip()
def update_dir_start_cluster(self, entry: DirEntry, new_start: int) -> None:
sec_off = entry.raw_offset - (entry.raw_offset % self.byts_per_sec)
sec = bytearray(read_at(self.f, sec_off, self.byts_per_sec))
i = entry.raw_offset - sec_off
struct.pack_into("<H", sec, i+26, new_start & 0xFFFF)
if self.bpb.fat_type == "FAT32":
struct.pack_into("<H", sec, i+20, (new_start >> 16) & 0xFFFF)
write_at(self.f, sec_off, bytes(sec))
def read_dir_start_cluster(self, entry: DirEntry) -> int:
sec_off = entry.raw_offset - (entry.raw_offset % self.byts_per_sec)
sec = read_at(self.f, sec_off, self.byts_per_sec)
i = entry.raw_offset - sec_off
start_lo = struct.unpack_from("<H", sec, i + 26)[0]
start_hi = struct.unpack_from("<H", sec, i + 20)[0] if self.bpb.fat_type == "FAT32" else 0
return (start_hi << 16) | start_lo
def read_dir_entry_bytes(self, entry: DirEntry) -> bytes:
sec_off = entry.raw_offset - (entry.raw_offset % self.byts_per_sec)
sec = read_at(self.f, sec_off, self.byts_per_sec)
i = entry.raw_offset - sec_off
return sec[i:i+32]
def update_fsinfo(self) -> None:
if self.bpb.fat_type != "FAT32" or self.bpb.fsinfo == 0:
return
fsinfo_sec = (self.part.lba_start + self.bpb.fsinfo) * self.byts_per_sec
sec = bytearray(read_at(self.f, fsinfo_sec, 512))
free = sum(1 for v in self.fat[2:self.max_cluster + 1] if v == self.free_value)
hint = 2
while hint <= self.max_cluster and self.fat[hint] != self.free_value:
hint += 1
if hint > self.max_cluster:
hint = 0xFFFFFFFF
struct.pack_into("<I", sec, 0x1E4, free if free < 0xFFFFFFFF else 0xFFFFFFFF)
struct.pack_into("<I", sec, 0x1E8, hint)
write_at(self.f, fsinfo_sec, bytes(sec))
# ------------------------- Integrity checker -------------------------
class IntegrityChecker:
def __init__(self, vol: 'FatVolume', strict: bool = True, verbose: bool = False) -> None:
self.vol = vol
self.strict = strict
self.verbose = verbose
self.cluster_size = vol.sec_per_clus * vol.byts_per_sec
def _canon_dir(self, p: str) -> str:
return p.rstrip("/\\") + "/"
def _cluster_chain_upto(self, start: int, max_len: int):
"""Return up to max_len clusters from the chain starting at `start`.
Also returns flags: truncated (chain shorter than max_len), looped, overlong."""
chain = []
seen = set()
n = start
looped = False
while n is not None and len(chain) < max_len:
if n in seen:
looped = True
break
seen.add(n)
chain.append(n)
try:
n = self.vol.next_cluster(n)
except ValueError as ex: # BAD cluster
raise
truncated = len(chain) < max_len and n is None
overlong = len(chain) == max_len and n is not None and not looped
return chain, truncated, looped, overlong
# POSIX join independent of host OS to avoid "\" vs "/" mismatches
def _pjoin(self, parent: str, name: str) -> str:
if not parent or parent == "/":
return "/" + name.strip("/\\")
return (parent.rstrip("/\\") + "/" + name.strip("/\\")).replace("//", "/")
# Compare FAT copies for mismatches
def _compare_fat_copies(self) -> Tuple[bool, List[int]]:
mismatches: List[int] = []
if self.vol.bpb.num_fats <= 1:
return True, mismatches
base = read_at(self.vol.f, self.vol.first_fat_sector * self.vol.byts_per_sec, self.vol.fatsz * self.vol.byts_per_sec)
for i in range(1, self.vol.bpb.num_fats):
other = read_at(self.vol.f, (self.vol.first_fat_sector + i * self.vol.fatsz) * self.vol.byts_per_sec, self.vol.fatsz * self.vol.byts_per_sec)
if other != base:
mismatches.append(i)
return (len(mismatches) == 0), mismatches
# Walk filesystem once, collect used clusters, detect issues, and count fragmentation.
def _scan_filesystem(self) -> Tuple[Dict[int, str], List[str], int, int]:
used: Dict[int, str] = {}
problems: List[str] = []
frag_files = 0
total_files = 0
# BFS over directories starting at root
q: List[Tuple[Optional[int], str]] = []
if self.vol.bpb.fat_type == "FAT16":
q.append((None, "/"))
else:
q.append((self.vol.bpb.root_clus, "/"))
seen_dirs = set()
while q:
start, path = q.pop(0)
try:
entries = self.vol.list_directory(start)
except Exception as e:
problems.append(f"Directory read error at {path}: {e}")
continue
for e in entries:
if e.is_dir:
if e.name in (".", ".."):
continue
if e.start_cluster >= 2:
dir_path = self._pjoin(path, e.name)
this_dir = self._canon_dir(dir_path)
if e.start_cluster not in seen_dirs:
seen_dirs.add(e.start_cluster)
q.append((e.start_cluster, dir_path))
try:
for c in self.vol.cluster_chain(e.start_cluster):
# Compare canonically to avoid "/DOS/" vs "/DOS//"
if c in used and used[c].rstrip("/\\") != this_dir.rstrip("/\\"):
problems.append(
f"Cross-link: cluster {c} used by {used[c]} and {this_dir}"
)
used.setdefault(c, this_dir)
except Exception as ex:
problems.append(f"Dir chain error for {dir_path}: {ex}")
else:
# Regular file
if e.start_cluster >= 2 and e.size > 0:
total_files += 1
file_path = self._pjoin(path, e.name)
if e.start_cluster > self.vol.max_cluster:
problems.append(
f"Invalid first cluster for {file_path}: cluster {e.start_cluster} exceeds max {self.vol.max_cluster}"
)
continue
if self.vol.fat[e.start_cluster] == self.vol.free_value:
problems.append(f"Invalid first cluster for {file_path}: cluster {e.start_cluster} is free")
continue
need = (e.size + self.cluster_size - 1) // self.cluster_size
try:
chain, truncated, looped, overlong = self._cluster_chain_upto(e.start_cluster, need)
except Exception as ex:
problems.append(f"File chain error for {file_path}: {ex}")
continue
if truncated:
problems.append(
f"Truncated chain for {file_path}: needs {need} clusters, chain has {len(chain)}"
)
if looped:
problems.append(f"FAT loop detected in {file_path}")
if overlong:
problems.append(
f"Overlong chain for {file_path}: size needs {need} clusters, chain continues past file data"
)
if any(chain[i] + 1 != chain[i+1] for i in range(len(chain)-1)):
frag_files += 1
for c in chain:
if c in used and used[c] != file_path:
problems.append(f"Cross-link: cluster {c} used by {used[c]} and {file_path}")
used.setdefault(c, file_path)
return used, problems, frag_files, total_files
def _collect_used_clusters(self) -> Tuple[Dict[int, str], List[str]]:
used, problems, _, _ = self._scan_filesystem()
return used, problems
# Any allocated cluster not referenced by a file/dir is an orphan
def _find_orphans(self, used: Dict[int, str]) -> List[int]:
orphans: List[int] = []
for c in range(2, self.vol.max_cluster + 1):
v = self.vol.fat[c]
if v == self.vol.free_value:
continue # free
if c not in used:
orphans.append(c)
return orphans
# Basic sanity on reserved FAT entries
def _check_boot_reserved(self) -> List[str]:
issues: List[str] = []
f0 = self.vol.fat[0]
f1 = self.vol.fat[1]
if self.vol.bpb.fat_type == "FAT16":
media = self.vol.bpb.media
if (f0 & 0xFF) != media:
issues.append(f"FAT[0] media byte mismatch: expected 0x{media:02X}, got 0x{f0 & 0xFF:02X}")
if f1 < EOC16:
issues.append("FAT[1] not marked EOC for FAT16")
else:
if f1 < EOC32:
issues.append("FAT[1] not marked EOC for FAT32")
return issues
# FSInfo consistency (FAT32 only)
def _check_fsinfo(self) -> List[str]:
notes: List[str] = []
if self.vol.bpb.fat_type != "FAT32" or self.vol.bpb.fsinfo == 0:
return notes
fsinfo_off = (self.vol.part.lba_start + self.vol.bpb.fsinfo) * self.vol.byts_per_sec
sec = read_at(self.vol.f, fsinfo_off, 512)
cached = struct.unpack_from("<I", sec, 0x1E4)[0]
computed = sum(1 for v in self.vol.fat[2:self.vol.max_cluster + 1] if v == self.vol.free_value)
if cached != 0xFFFFFFFF and cached != computed:
notes.append(f"FSInfo free count {cached} != computed {computed}")
hint = struct.unpack_from("<I", sec, 0x1E8)[0]
if hint != 0xFFFFFFFF and (hint < 2 or hint > self.vol.max_cluster):
notes.append("FSInfo next-free hint out of range")
return notes
# Run full integrity suite
def run(self) -> bool:
ok = True
# FAT copies identical?
fats_ok, mism = self._compare_fat_copies()
if not fats_ok:
ok = False
print(f"ERROR: FAT copy mismatch detected in copies: {mism}")
# Reserved entries heuristics
for issue in self._check_boot_reserved():
print("WARN:", issue)
# Usage map + structural problems
used, problems, frag_files, total_files = self._scan_filesystem()
for p in problems:
if "Cross-link" in p or "chain error" in p or "Truncated" in p or "Overlong" in p or "Invalid first cluster" in p:
ok = False
print(("ERROR:" if ("Cross-link" in p or "chain error" in p or "Truncated" in p or "Overlong" in p or "Invalid first cluster" in p) else "WARN:"), p)
# Orphans (lost chains)
orphans = self._find_orphans(used)
if orphans:
msg = f"Found {len(orphans)} orphan allocated clusters (lost chains)"
print(("ERROR:" if self.strict else "WARN:"), msg)
if self.verbose:
print(" sample:", orphans[:20])
if self.strict:
ok = False
# FSInfo notes
for note in self._check_fsinfo():
print("NOTE:", note)
print(f"Summary: {frag_files}/{total_files} files are fragmented.")
return ok
# ------------------------- Defrag planner -------------------------
@dataclass
class FilePlan:
entry: DirEntry
old_chain: List[int]
new_start: int
new_chain: List[int]
class Defragmenter:
def __init__(self, vol: FatVolume, verbose: bool=False):
self.vol = vol
self.verbose = verbose
self.cluster_size = vol.sec_per_clus * vol.byts_per_sec
self.next_free_cursor = 2
self.free = [i for i in range(2, vol.max_cluster + 1) if vol.fat[i] == vol.free_value]
self.free_set = set(self.free)
if self.verbose:
print(f"Free clusters: {len(self.free)}")
def _find_free_run(self, needed: int) -> Optional[int]:
start = max(self.next_free_cursor, 2)
run_start = None
run_len = 0
i = start
last = self.vol.max_cluster
while i <= last:
if self.vol.fat[i] == self.vol.free_value:
if run_start is None:
run_start = i; run_len = 1
else:
if i == run_start + run_len:
run_len += 1
else:
run_start = i; run_len = 1
if run_len >= needed:
return run_start
i += 1
return None
def _reserve_run(self, start: int, length: int) -> None:
for i in range(start, start+length):
if self.vol.fat[i] != self.vol.free_value:
raise ValueError("Tried to reserve a non-free cluster")
self.vol.fat[i] = 1 # local reservation marker; will be overwritten by links
def _release_old_chain(self, chain: List[int]) -> None:
for c in chain:
self.vol.fat[c] = self.vol.free_value
def _write_chain_links(self, start: int, length: int) -> List[int]:
chain = [start + i for i in range(length)]
for i, c in enumerate(chain):
if i == length - 1:
self.vol.fat[c] = self.vol.eoc
else:
self.vol.fat[c] = c + 1
return chain
def _copy_chain_data(self, src_chain: List[int], dst_chain: List[int], file_prog: Optional[Progress], overall: Optional[Progress]) -> None:
for s, d in zip(src_chain, dst_chain):
data = self.vol.cluster_read(s)
self.vol.cluster_write(d, data)
if file_prog: file_prog.update(1)
if overall: overall.update(1)
def _required_clusters(self, entry: DirEntry) -> int:
return (entry.size + self.cluster_size - 1) // self.cluster_size
def _file_data_digest(self, entry: DirEntry, chain: List[int]) -> str:
remaining = entry.size
digest = hashlib.sha1()
for cluster in chain:
if remaining <= 0:
break
chunk = self.vol.cluster_read(cluster)
take = min(len(chunk), remaining)
digest.update(chunk[:take])
remaining -= take
if remaining != 0:
raise ValueError(f"Digest verification length mismatch for {entry.name}")
return digest.hexdigest()
def _validated_file_chain(self, entry: DirEntry, start_cluster: Optional[int] = None) -> List[int]:
start = entry.start_cluster if start_cluster is None else start_cluster
if entry.size == 0:
return []
if start < 2:
raise ValueError(f"Invalid start cluster {start} for {entry.name}")
if self.vol.fat[start] == self.vol.free_value:
raise ValueError(f"First cluster {start} is free for {entry.name}")
need = self._required_clusters(entry)
chain: List[int] = []
seen = set()
n = start
while n is not None and len(chain) < need:
if n in seen:
raise ValueError(f"FAT loop detected for {entry.name}")
seen.add(n)
chain.append(n)
n = self.vol.next_cluster(n)
if len(chain) < need:
raise ValueError(f"Truncated chain for {entry.name}: needs {need}, has {len(chain)}")
if n is not None:
raise ValueError(f"Overlong chain for {entry.name}: exceeds {need} clusters")
return chain
def _gather_files(self) -> List[DirEntry]:
q: List[Tuple[Optional[int], str]] = [(None, "/")] if self.vol.bpb.fat_type == "FAT16" else [(self.vol.bpb.root_clus, "/")]
files: List[DirEntry] = []
seen_dirs = set()
while q:
start, path = q.pop(0)
entries = self.vol.list_directory(start)
for e in entries:
if e.is_dir:
if e.name in (".", ".."):
continue
if e.start_cluster >= 2 and e.start_cluster not in seen_dirs:
seen_dirs.add(e.start_cluster)
q.append((e.start_cluster, os.path.join(path, e.name)))
else:
files.append(e)
return files
def plan(self) -> List[FilePlan]:
plans: List[FilePlan] = []
candidates = []
for e in self._gather_files():
if e.start_cluster < 2 or e.size == 0:
continue
old_chain = self._validated_file_chain(e)
candidates.append((old_chain[0], e.raw_offset, e, old_chain))
candidates.sort(key=lambda item: (item[0], item[1]))
for _, _, e, old_chain in candidates:
need = self._required_clusters(e)
contiguous = all(old_chain[i] + 1 == old_chain[i+1] for i in range(len(old_chain)-1))
if contiguous and old_chain[0] >= self.next_free_cursor:
self.next_free_cursor = old_chain[-1] + 1
continue
pos = self._find_free_run(need)
if pos is None:
continue
self._reserve_run(pos, need)
new_chain = [pos + i for i in range(need)]
plans.append(FilePlan(entry=e, old_chain=old_chain, new_start=pos, new_chain=new_chain))
self.next_free_cursor = max(self.next_free_cursor, new_chain[-1] + 1)
return plans
def execute(self, plans: List[FilePlan], dry_run: bool=False, show_progress: bool=True) -> None:
total_clusters = sum(len(p.new_chain) for p in plans)
overall = Progress(total_clusters, prefix="Overall") if (show_progress and not dry_run) else None
for i, p in enumerate(plans, 1):
if self.verbose:
print(f"[{i}/{len(plans)}] {p.entry.name}: {len(p.old_chain)} clu -> {len(p.new_chain)} clu at {p.new_start}")
if dry_run:
continue
expected_digest = self._file_data_digest(p.entry, p.old_chain)
before_entry = self.vol.read_dir_entry_bytes(p.entry)
file_prog = Progress(len(p.new_chain), prefix=f"{p.entry.name[:30]:<30}") if show_progress else None
self._copy_chain_data(p.old_chain, p.new_chain, file_prog, overall)
if file_prog: file_prog.close()
self._write_chain_links(p.new_start, len(p.new_chain))
self.vol.update_dir_start_cluster(p.entry, p.new_start)
written_start = self.vol.read_dir_start_cluster(p.entry)
if written_start != p.new_start:
raise IOError(f"Directory entry update failed for {p.entry.name}: expected {p.new_start}, found {written_start}")
after_entry = self.vol.read_dir_entry_bytes(p.entry)
allowed_offsets = set(_fat_entry_cluster_field_offsets(self.vol.bpb.fat_type))
changed_offsets = {idx for idx, (before, after) in enumerate(zip(before_entry, after_entry)) if before != after}
if not changed_offsets.issubset(allowed_offsets):
raise IOError(f"Unexpected directory entry changes for {p.entry.name}: {sorted(changed_offsets)}")
verified_chain = self._validated_file_chain(p.entry, start_cluster=p.new_start)
if verified_chain != p.new_chain:
raise IOError(f"Verification failed for {p.entry.name}: expected {p.new_chain}, found {verified_chain}")
actual_digest = self._file_data_digest(p.entry, verified_chain)
if actual_digest != expected_digest:
raise IOError(f"Content verification failed for {p.entry.name}")
self._release_old_chain(p.old_chain)
if not dry_run:
self.vol.flush_fat()
self.vol.update_fsinfo()
if overall: overall.close()
# ------------------------- CLI / Orchestration -------------------------
def human_size(nbytes: int) -> str:
for unit in ["B","KiB","MiB","GiB","TiB"]:
if nbytes < 1024 or unit == "TiB":
return f"{nbytes:.1f} {unit}" if unit != "B" else f"{nbytes} B"
nbytes /= 1024
return f"{nbytes:.1f} B"
def list_fat_partitions(image_path: str) -> List[Part]:
with open(image_path, "rb") as rf:
f = io.BufferedRandom(io.BytesIO(rf.read()))
parts = parse_mbr_partitions(f)
fat_parts = [p for p in parts if p.part_type in FAT_TYPES]
return fat_parts
def print_partition_table(image_path: str) -> List[Part]:
with open(image_path, "rb") as fraw:
f = io.BufferedRandom(io.BytesIO(fraw.read()))
parts = parse_mbr_partitions(f)
if not parts:
print("No partitions found.")
return []
print("Partitions found (index, start LBA, size, type):")
for p in parts:
print(" ", p)
return parts
def open_image_rw(path: str) -> io.BufferedRandom:
return open(path, "r+b", buffering=0)