Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions workers/openrelik-worker-yara/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
fdisk \
qemu-utils \
ntfs-3g \
ewf-tools \
btrfs-progs \
&& rm -rf /var/lib/apt/lists/*

# Configure debugging
Expand Down
2 changes: 1 addition & 1 deletion workers/openrelik-worker-yara/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ requires-python = ">=3.11,<4.0"
readme = "README.md"
dependencies = [
"celery[redis]>=5.4.0,<6",
"openrelik-worker-common>=0.17.1,<1.0.0",
"openrelik-worker-common>=0.17.3,<1.0.0",
"openrelik-common>=0.7.0,<1.0.0",
]

Expand Down
203 changes: 186 additions & 17 deletions workers/openrelik-worker-yara/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ class YaraMatch:
score: int


@dataclass
class SkippedInput:
"""Dataclass to store skipped Yara input information."""

input_name: str
reason: str


def validate_scan_target(path: str, display_name: str = "input") -> None:
"""Reject scan targets that would silently scan the worker container."""
if not path:
raise RuntimeError("Input file path is empty.")
if os.path.abspath(path) == os.path.sep:
raise RuntimeError(
"Refusing to scan filesystem root as Yara input. "
"Select a file or folder input instead."
)
if not os.path.exists(path):
raise RuntimeError(f"Input path does not exist: {path}")
if not (os.path.isfile(path) or os.path.isdir(path)):
raise RuntimeError(
f"Unsupported Yara scan target: {display_name}. "
"The Yara worker can scan regular files and directories. "
"Disk images must be mounted before scanning."
)


def send_progress(task, status: str, progress: str | None = None) -> None:
"""Send task progress text for the OpenRelik UI."""
data = {"status": status}
if progress:
data["progress"] = progress
task.send_event("task-progress", data=data)


def cleanup_fraken_output_log(logfile: OutputFile) -> None:
"""Cleanup fraken-x output to be one entry per line.

Expand Down Expand Up @@ -133,16 +168,24 @@ def cleanup_fraken_output_log(logfile: OutputFile) -> None:
json.dump(extracted_dicts, f)


def generate_report_from_matches(matches: list[YaraMatch]) -> Report:
def generate_report_from_matches(
matches: list[YaraMatch], skipped_inputs: list[SkippedInput] | None = None
) -> Report:
"""Generate a report from Yara matches.

Args:
matches: List of YaraMatch objects.
skipped_inputs: List of skipped input files and the reason they were skipped.

Returns:
Report object.
"""
skipped_inputs = skipped_inputs or []
report = Report("Yara scan report")
report.summary = f"{len(matches)} Yara match(es) found."
if skipped_inputs:
report.summary += f" {len(skipped_inputs)} input(s) skipped."

matches_section = report.add_section()
matches_section.add_paragraph("List of Yara matches found in the scanned files.")
if matches:
Expand All @@ -162,6 +205,17 @@ def generate_report_from_matches(matches: list[YaraMatch]) -> Report:

matches_section.add_table(match_table)

if skipped_inputs:
skipped_section = report.add_section()
skipped_section.add_header("Skipped inputs")
skipped_section.add_paragraph(
"Inputs that were not scanned because they were not valid scan targets."
)
skipped_table = MarkdownTable(["input", "reason"])
for skipped_input in skipped_inputs:
skipped_table.add_row([skipped_input.input_name, skipped_input.reason])
skipped_section.add_table(skipped_table)

return report


Expand Down Expand Up @@ -195,6 +249,7 @@ def command(
telemetry.add_attribute_to_current_span("workflow_id", workflow_id)

output_files = []
task_files = []

all_patterns = ""
global_yara = task_config.get("Global Yara rules", "")
Expand Down Expand Up @@ -253,9 +308,17 @@ def command(
fraken_output = create_output_file(
output_path, display_name="fraken_out.jsonl", data_type="yara:yara-scan:jsonl"
)
fraken_stderr = create_output_file(
output_path,
display_name="fraken_stderr.log",
data_type="yara:yara-scan:log",
)
output_files.append(fraken_output.to_dict())
task_files.append(fraken_stderr.to_dict())

input_files = get_input_files(pipe_result, input_files)
input_files = get_input_files(pipe_result, input_files or [])
if not input_files:
raise RuntimeError("No input files were provided to Yara scan.")

input_files_map = {}
for input_file in input_files:
Expand All @@ -266,46 +329,151 @@ def command(
disks_mounted = []
try:
folders_and_files = []
skipped_inputs = []
bd = None
for input_file in input_files:
if "path" not in input_file:
skipped_inputs.append(
SkippedInput(
input_file.get("display_name", "UNKNOWN FILE"),
"Input does not have a path.",
)
)
logger.warning(
"Skipping file %s as it does not have an path", input_file
)
continue

input_file_path = input_file.get("path")
display_name = input_file.get("display_name", input_file_path)
try:
validate_scan_target(input_file_path, display_name)
except RuntimeError as e:
skipped_inputs.append(SkippedInput(display_name, str(e)))
logger.error(
"Skipping Yara scan input %s (%s): %s",
display_name,
input_file_path,
str(e),
)
continue

# Check if disk image, mount and add mountpoints to scan
if mount_disk_images and is_disk_image(input_file):
bd = BlockDevice(input_file_path, min_partition_size=1)
bd.setup()
mountpoints = bd.mount()
disks_mounted.append(bd)
if is_disk_image(input_file):
if not mount_disk_images:
reason = (
"Disk image input is not supported in regular scan mode. "
"Enable mount_disk_images to scan files inside supported "
"disk image filesystems."
)
skipped_inputs.append(SkippedInput(display_name, reason))
logger.error(
"%s: %s",
display_name,
reason,
)
continue

try:
send_progress(self, "Mounting disk image", display_name)
bd = BlockDevice(input_file_path, min_partition_size=1)
bd.setup()
disks_mounted.append(bd)
mountpoints = bd.mount()
send_progress(
self,
"Mounted disk image",
f"{display_name}: {len(mountpoints)} mountpoint(s)",
)
except RuntimeError as e:
logger.error(
"Error mounting disk image %s (%s): %s",
display_name,
input_file_path,
str(e),
)
skipped_inputs.append(
SkippedInput(
display_name,
"Disk image input is not supported or could not be "
"mounted by the Yara worker.",
)
)
continue

if not mountpoints:
logger.info(
skipped_inputs.append(
SkippedInput(
input_file.get("display_name", input_file_path),
"No mountpoints returned for input file.",
)
)
logger.error(
"No mountpoints returned for input file %s",
input_file.get("display_name"),
)
continue
for mountpoint in mountpoints:
try:
validate_scan_target(mountpoint, display_name)
except RuntimeError as e:
skipped_inputs.append(
SkippedInput(f"{display_name}: {mountpoint}", str(e))
)
logger.error(
"Skipping Yara scan mountpoint %s for %s: %s",
mountpoint,
display_name,
str(e),
)
continue
folders_and_files.append("--folder")
folders_and_files.append(mountpoint)
else:
folders_and_files.append("--folder")
folders_and_files.append(input_file_path)

if not folders_and_files:
message = "No scan targets were produced from input files."
if skipped_inputs:
last_skipped_input = skipped_inputs[-1]
message = (
f"{message} Last skipped input error: "
f"{last_skipped_input.input_name}: {last_skipped_input.reason}"
)
raise RuntimeError(message)

cmd = ["fraken"] + folders_and_files + [f"{all_yara.path}"]
logger.info(
"fraken-x scan targets: %s",
folders_and_files[1::2],
)
logger.debug(f"fraken-x command: {cmd}")
with open(fraken_output.path, "w+", encoding="utf-8") as log:
self.send_event("task-progress")
process = subprocess.Popen(cmd, stdout=log, stderr=subprocess.PIPE)
process.wait()
if process.stderr:
# Note: fraken-x uses the eprintln! Rust macro to print progress,
# this outputs to stderr....
logger.info(f"fraken-x: {process.stderr.readlines()}")
with (
open(fraken_output.path, "w+", encoding="utf-8") as log,
open(fraken_stderr.path, "w+", encoding="utf-8") as stderr_log,
):
send_progress(self, "Running Yara scan")
process = subprocess.run(
cmd,
stdout=log,
stderr=stderr_log,
check=False,
text=True,
)

if os.path.getsize(fraken_stderr.path) > 0:
logger.info(f"fraken-x stderr written to {fraken_stderr.path}")

if process.returncode != 0:
raise RuntimeError(
"An error occurred while running fraken-x. "
f"Exit code: {process.returncode}. "
f"See stderr log for details: {fraken_stderr.path}"
)
except RuntimeError as e:
logger.error("Error encountered: %s", str(e))
raise
finally:
for blockdevice in disks_mounted:
if blockdevice:
Expand Down Expand Up @@ -334,7 +502,7 @@ def command(

cleanup_fraken_output_log(fraken_output)

report = generate_report_from_matches(all_matches)
report = generate_report_from_matches(all_matches, skipped_inputs)
report_file = create_output_file(
output_path,
display_name="yara-scan-report.md",
Expand All @@ -347,6 +515,7 @@ def command(

return create_task_result(
output_files=output_files,
task_files=task_files,
workflow_id=workflow_id,
command="fraken",
task_report=report.to_dict(),
Expand Down
Loading