Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions workers/grep/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selectio
# Install uv and any other dependency that your worker needs.
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ugrep \
# Add your dependencies here
&& rm -rf /var/lib/apt/lists/*

Expand Down
6 changes: 5 additions & 1 deletion workers/grep/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# OpenRelik Worker for Grep

This repository contains the OpenRelik worker designed to run grep on files to search for patterns.
This repository contains the OpenRelik worker designed to search for patterns in files.

It provides two tasks for two different matching engines:
- Traditional Unix `grep`
- More recent `ugrep` with support for fuzzy search, archive decompression and other features

### Installation

Expand Down
3 changes: 3 additions & 0 deletions workers/grep/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ test = ["pytest", "pytest-cov>=5.0.0,<6", "pytest-mock>=3.14.0,<4"]
package = false
default-groups = ["test"]

[tool.pytest.ini_options]
pythonpath = ["."]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Empty file removed workers/grep/src/__init__.py
Empty file.
9 changes: 8 additions & 1 deletion workers/grep/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,12 @@
start_debugger()

REDIS_URL = os.getenv("REDIS_URL") or "redis://localhost:6379/0"
celery = Celery(broker=REDIS_URL, backend=REDIS_URL, include=["src.tasks"])
celery = Celery(
broker=REDIS_URL,
backend=REDIS_URL,
include=["src.task_grep", "src.task_ugrep"],
worker_hijack_root_logger=False, # Disable Celery hijacking configured Python loggers.
worker_log_format="%(message)s",
worker_task_log_format="%(message)s",
)
redis_client = redis.Redis.from_url(REDIS_URL)
File renamed without changes.
156 changes: 156 additions & 0 deletions workers/grep/src/task_ugrep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess

from openrelik_worker_common.file_utils import create_output_file, count_file_lines
from openrelik_worker_common.task_utils import create_task_result, get_input_files

import datetime
import time

from .app import celery

TASK_NAME = "openrelik-worker-grep.tasks.ugrep"

TASK_METADATA = {
"display_name": "Ugrep",
"description": "Search for patterns in a file using ugrep. Supports regexp, fuzzy search, searching inside archives, and more.",
"task_config": [
{
"name": "pattern",
"label": "",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add label.

"description": "Pattern to search for (defaults to extended regular expression)",
"type": "text",
"required": True,
},
{
"name": "invert-match",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it invert_match to be consistent with other workers.

"label": "invert match",
"description": "Selected lines are those not matching any of the specified patterns.",
"type": "checkbox",
"required": True,
"default_value": False,
},
{
"name": "stats",
"label": "stats",
"description": "Output statistics on the number of files and directories searched and matches found.",
"type": "checkbox",
"required": True,
"default_value": True,
},
{
"name": "json_output",
"label": "JSON output",
"description": "Output file matches in JSON.",
"type": "checkbox",
"required": True,
"default_value": False,
},
{
"name": "decompress",
"label": "search archives",
"description": "Search compressed files and archives.",
"type": "checkbox",
"required": True,
"default_value": False,
},
],
}


@celery.task(bind=True, name=TASK_NAME, metadata=TASK_METADATA)
def command(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self,
pipe_result: str = None,
input_files: list = None,
output_path: str = None,
workflow_id: str = None,
task_config: dict = None,
) -> str:
"""Run grep on input files.

Args:
pipe_result: Base64-encoded result from the previous Celery task, if any.
input_files: List of input file dictionaries (unused if pipe_result exists).
output_path: Path to the output directory.
workflow_id: ID of the workflow.
task_config: User configuration for the task.

Returns:
Base64-encoded dictionary containing task results.
"""
input_files = get_input_files(pipe_result, input_files or [])
output_files = []
base_command = prepare_base_command(task_config)
base_command_string = " ".join(base_command)

output_extension = ".ugrep.json" if task_config.get("json_output") else ".ugrep"

for input_file in input_files:
output_file = create_output_file(
output_path, display_name=input_file.get("display_name") + output_extension
)
command = base_command + [input_file.get("path")]

with open(output_file.path, "w") as fh:
process = subprocess.Popen(command, stdout=fh)
start_time = datetime.datetime.now()
update_interval_s = 3

while process.poll() is None:
grep_matches = count_file_lines(output_file.path)
duration = datetime.datetime.now() - start_time
rate = (
int(grep_matches / duration.total_seconds())
if duration.total_seconds() > 0
else 0
)
self.send_event(
"task-progress",
data={"extracted_strings": grep_matches, "rate": rate},
)
time.sleep(update_interval_s)

output_files.append(output_file.to_dict())

if not output_files:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will never execute, there will always be output_files. I would

  • remove this
    OR
  • check if there is indeed output from ugrep and if not generate a logger.warning() message that ugrep produced no output instead of a RuntimeError.

Removing this seems fine imo.

raise RuntimeError("Ugrep task yielded no results")

return create_task_result(
output_files=output_files,
workflow_id=workflow_id,
command=base_command_string,
meta={},
)


def prepare_base_command(task_config):
base_command = ["ugrep"]

if task_config.get("stats"):
base_command.append("--stats")
if task_config.get("json_output"):
base_command.append("--json")
if task_config.get("decompress"):
base_command.append("--decompress")

# Pattern options
if task_config.get("invert-match"):
base_command.append("--invert-match")
base_command.append(task_config.get("pattern"))

base_command.append("--")
return base_command
Empty file removed workers/grep/tests/__init__.py
Empty file.
63 changes: 63 additions & 0 deletions workers/grep/tests/test_ugrep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import pytest

"""Tests ugrep task."""

from src.task_ugrep import prepare_base_command

def test_prepare_base_command_basic():
"""Test prepare_base_command with just a pattern."""
task_config = {
"pattern": "my_pattern"
}
expected = ["ugrep", "my_pattern", "--"]
assert prepare_base_command(task_config) == expected

def test_prepare_base_command_all_flags():
"""Test prepare_base_command with all flags enabled."""
task_config = {
"stats": True,
"json_output": True,
"decompress": True,
"invert-match": True,
"pattern": "test_pattern"
}
expected = [
"ugrep",
"--stats",
"--json",
"--decompress",
"--invert-match",
"test_pattern",
"--"
]
assert prepare_base_command(task_config) == expected

def test_prepare_base_command_some_flags():
"""Test prepare_base_command with a subset of flags enabled."""
task_config = {
"stats": False,
"json_output": True,
"decompress": False,
"invert-match": True,
"pattern": "another_pattern"
}
expected = [
"ugrep",
"--json",
"--invert-match",
"another_pattern",
"--"
]
assert prepare_base_command(task_config) == expected
Loading