-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimage_parser.py
More file actions
143 lines (116 loc) · 4.35 KB
/
Copy pathimage_parser.py
File metadata and controls
143 lines (116 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
image_parser.py — Forensic image mounting and filesystem traversal.
Supports: E01 (libewf), DD (raw), AFF4; filesystems: NTFS, ext4, APFS.
"""
import os
import pytsk3
import pyewf
import logging
from dataclasses import dataclass, field
from typing import Iterator, Optional
from config import cfg
logger = logging.getLogger(__name__)
@dataclass
class FileEntry:
path: str
size: int
inode: int
created: Optional[float]
modified: Optional[float]
accessed: Optional[float]
is_deleted: bool
fs_type: str
data: bytes = field(default=b"", repr=False)
class EWFImgInfo(pytsk3.Img_Info):
"""Adapter: libewf handle → pytsk3 image source."""
def __init__(self, ewf_handle):
self._ewf = ewf_handle
super().__init__(url="", type=pytsk3.TSK_IMG_TYPE_EXTERNAL)
def read(self, offset, length):
self._ewf.seek(offset)
return self._ewf.read(length)
def get_size(self):
return self._ewf.get_media_size()
def _open_image(image_path: str) -> pytsk3.Img_Info:
ext = os.path.splitext(image_path)[1].lower()
if ext in (".e01", ".ex01"):
filenames = pyewf.glob(image_path)
handle = pyewf.handle()
handle.open(filenames)
return EWFImgInfo(handle)
elif ext in (".dd", ".raw", ".img", ".aff4"):
# AFF4 and DD both expose a raw block device; pytsk3 reads them natively
return pytsk3.Img_Info(image_path)
else:
raise ValueError(f"Unsupported image format: {ext}")
def _ts(tsk_time) -> Optional[float]:
try:
return float(tsk_time) if tsk_time else None
except Exception:
return None
def _walk_directory(fs: pytsk3.FS_Info, directory, parent_path: str, fs_type: str) -> Iterator[FileEntry]:
for entry in directory:
name = entry.info.name.name
if isinstance(name, bytes):
name = name.decode("utf-8", errors="replace")
if name in (".", ".."):
continue
path = f"{parent_path}/{name}"
meta = entry.info.meta
is_deleted = bool(meta and meta.flags & pytsk3.TSK_FS_META_FLAG_UNALLOC) if meta else False
size = meta.size if meta else 0
inode = meta.addr if meta else 0
created = _ts(meta.crtime) if meta else None
modified = _ts(meta.mtime) if meta else None
accessed = _ts(meta.atime) if meta else None
data = b""
if meta and meta.type == pytsk3.TSK_FS_META_TYPE_REG and size > 0:
try:
f = fs.open_meta(inode=inode)
data = f.read_random(0, min(size, cfg.max_file_read_bytes))
except Exception:
pass
yield FileEntry(path, size, inode, created, modified, accessed, is_deleted, fs_type, data)
if meta and meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
try:
sub_dir = entry.as_directory()
yield from _walk_directory(fs, sub_dir, path, fs_type)
except Exception:
pass
def parse_image(image_path: str) -> Iterator[FileEntry]:
"""
Mount a forensic image and yield FileEntry objects for every file.
Handles partition tables; auto-detects NTFS / ext4 / APFS per partition.
"""
img = _open_image(image_path)
try:
volume = pytsk3.Volume_Info(img)
partitions = list(volume)
except Exception:
partitions = None
if partitions:
for part in partitions:
if part.desc and b"Unallocated" in part.desc:
continue
try:
fs = pytsk3.FS_Info(img, offset=part.start * 512)
fs_type = _detect_fs(fs)
root = fs.open_dir("/")
yield from _walk_directory(fs, root, "", fs_type)
except Exception as e:
logger.debug("Skipping partition %s: %s", part.addr, e)
else:
# No partition table — treat whole image as single filesystem
fs = pytsk3.FS_Info(img)
fs_type = _detect_fs(fs)
root = fs.open_dir("/")
yield from _walk_directory(fs, root, "", fs_type)
def _detect_fs(fs: pytsk3.FS_Info) -> str:
type_map = {
pytsk3.TSK_FS_TYPE_NTFS: "NTFS",
pytsk3.TSK_FS_TYPE_EXT2: "ext4",
pytsk3.TSK_FS_TYPE_EXT3: "ext4",
pytsk3.TSK_FS_TYPE_EXT4: "ext4",
pytsk3.TSK_FS_TYPE_APFS: "APFS",
}
return type_map.get(fs.info.ftype, "unknown")