From 53176246d724da0cb98a9d3d7dedbab1e90c1bed Mon Sep 17 00:00:00 2001 From: spop3000 Date: Tue, 2 Jun 2026 09:31:47 +0000 Subject: [PATCH 1/2] adding ugrep task to grep worker --- workers/grep/Dockerfile | 1 + workers/grep/README.md | 6 +- workers/grep/src/__init__.py | 0 workers/grep/src/app.py | 9 +- workers/grep/src/{tasks.py => task_grep.py} | 0 workers/grep/src/task_ugrep.py | 156 ++++++++++++++++++++ workers/grep/tests/__init__.py | 0 workers/grep/tests/test_ugrep.py | 63 ++++++++ 8 files changed, 233 insertions(+), 2 deletions(-) delete mode 100644 workers/grep/src/__init__.py rename workers/grep/src/{tasks.py => task_grep.py} (100%) create mode 100644 workers/grep/src/task_ugrep.py delete mode 100644 workers/grep/tests/__init__.py create mode 100644 workers/grep/tests/test_ugrep.py diff --git a/workers/grep/Dockerfile b/workers/grep/Dockerfile index 21f962e..02028ed 100644 --- a/workers/grep/Dockerfile +++ b/workers/grep/Dockerfile @@ -8,6 +8,7 @@ RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selectio # Install uv and any other dependency that your worker needs. RUN apt-get update && apt-get install -y --no-install-recommends \ curl \ + ugrep \ # Add your dependencies here && rm -rf /var/lib/apt/lists/* diff --git a/workers/grep/README.md b/workers/grep/README.md index 685a934..ebe052f 100644 --- a/workers/grep/README.md +++ b/workers/grep/README.md @@ -1,6 +1,10 @@ # OpenRelik Worker for Grep -This repository contains the OpenRelik worker designed to run grep on files to search for patterns. +This repository contains the OpenRelik worker designed to search for patterns in files. + +It provides two tasks for two different matching engines: +- Traditional Unix `grep` +- More recent `ugrep` with support for fuzzy search, archive decompression and other features ### Installation diff --git a/workers/grep/src/__init__.py b/workers/grep/src/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/workers/grep/src/app.py b/workers/grep/src/app.py index 15e9f9e..4e196b8 100644 --- a/workers/grep/src/app.py +++ b/workers/grep/src/app.py @@ -23,5 +23,12 @@ start_debugger() REDIS_URL = os.getenv("REDIS_URL") or "redis://localhost:6379/0" -celery = Celery(broker=REDIS_URL, backend=REDIS_URL, include=["src.tasks"]) +celery = Celery( + broker=REDIS_URL, + backend=REDIS_URL, + include=["src.task_grep", "src.task_ugrep"], + worker_hijack_root_logger=False, # Disable Celery hijacking configured Python loggers. + worker_log_format="%(message)s", + worker_task_log_format="%(message)s", +) redis_client = redis.Redis.from_url(REDIS_URL) diff --git a/workers/grep/src/tasks.py b/workers/grep/src/task_grep.py similarity index 100% rename from workers/grep/src/tasks.py rename to workers/grep/src/task_grep.py diff --git a/workers/grep/src/task_ugrep.py b/workers/grep/src/task_ugrep.py new file mode 100644 index 0000000..7911333 --- /dev/null +++ b/workers/grep/src/task_ugrep.py @@ -0,0 +1,156 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess + +from openrelik_worker_common.file_utils import create_output_file, count_file_lines +from openrelik_worker_common.task_utils import create_task_result, get_input_files + +import datetime +import time + +from .app import celery + +TASK_NAME = "openrelik-worker-grep.tasks.ugrep" + +TASK_METADATA = { + "display_name": "Ugrep", + "description": "Search for patterns in a file using ugrep. Supports regexp, fuzzy search, searching inside archives, and more.", + "task_config": [ + { + "name": "pattern", + "label": "", + "description": "Pattern to search for (defaults to extended regular expression)", + "type": "text", + "required": True, + }, + { + "name": "invert-match", + "label": "invert match", + "description": "Selected lines are those not matching any of the specified patterns.", + "type": "checkbox", + "required": True, + "default_value": False, + }, + { + "name": "stats", + "label": "stats", + "description": "Output statistics on the number of files and directories searched and matches found.", + "type": "checkbox", + "required": True, + "default_value": True, + }, + { + "name": "json_output", + "label": "JSON output", + "description": "Output file matches in JSON.", + "type": "checkbox", + "required": True, + "default_value": False, + }, + { + "name": "decompress", + "label": "search archives", + "description": "Search compressed files and archives.", + "type": "checkbox", + "required": True, + "default_value": False, + }, + ], +} + + +@celery.task(bind=True, name=TASK_NAME, metadata=TASK_METADATA) +def command( + self, + pipe_result: str = None, + input_files: list = None, + output_path: str = None, + workflow_id: str = None, + task_config: dict = None, +) -> str: + """Run grep on input files. + + Args: + pipe_result: Base64-encoded result from the previous Celery task, if any. + input_files: List of input file dictionaries (unused if pipe_result exists). + output_path: Path to the output directory. + workflow_id: ID of the workflow. + task_config: User configuration for the task. + + Returns: + Base64-encoded dictionary containing task results. + """ + input_files = get_input_files(pipe_result, input_files or []) + output_files = [] + base_command = prepare_base_command(task_config) + base_command_string = " ".join(base_command) + + output_extension = ".ugrep.json" if task_config.get("json_output") else ".ugrep" + + for input_file in input_files: + output_file = create_output_file( + output_path, display_name=input_file.get("display_name") + output_extension + ) + command = base_command + [input_file.get("path")] + + with open(output_file.path, "w") as fh: + process = subprocess.Popen(command, stdout=fh) + start_time = datetime.datetime.now() + update_interval_s = 3 + + while process.poll() is None: + grep_matches = count_file_lines(output_file.path) + duration = datetime.datetime.now() - start_time + rate = ( + int(grep_matches / duration.total_seconds()) + if duration.total_seconds() > 0 + else 0 + ) + self.send_event( + "task-progress", + data={"extracted_strings": grep_matches, "rate": rate}, + ) + time.sleep(update_interval_s) + + output_files.append(output_file.to_dict()) + + if not output_files: + raise RuntimeError("Ugrep task yielded no results") + + return create_task_result( + output_files=output_files, + workflow_id=workflow_id, + command=base_command_string, + meta={}, + ) + + +def prepare_base_command(task_config): + base_command = ["ugrep"] + + if task_config.get("stats"): + base_command.append("--stats") + if task_config.get("json_output"): + base_command.append("--json") + if task_config.get("decompress"): + base_command.append("--decompress") + + # Pattern options + if task_config.get("invert-match"): + base_command.append("--invert-match") + base_command.append(task_config.get("pattern")) + + base_command.append("--") + return base_command diff --git a/workers/grep/tests/__init__.py b/workers/grep/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/workers/grep/tests/test_ugrep.py b/workers/grep/tests/test_ugrep.py new file mode 100644 index 0000000..47569a4 --- /dev/null +++ b/workers/grep/tests/test_ugrep.py @@ -0,0 +1,63 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import pytest + +"""Tests ugrep task.""" + +from src.task_ugrep import prepare_base_command + +def test_prepare_base_command_basic(): + """Test prepare_base_command with just a pattern.""" + task_config = { + "pattern": "my_pattern" + } + expected = ["ugrep", "my_pattern", "--"] + assert prepare_base_command(task_config) == expected + +def test_prepare_base_command_all_flags(): + """Test prepare_base_command with all flags enabled.""" + task_config = { + "stats": True, + "json_output": True, + "decompress": True, + "invert-match": True, + "pattern": "test_pattern" + } + expected = [ + "ugrep", + "--stats", + "--json", + "--decompress", + "--invert-match", + "test_pattern", + "--" + ] + assert prepare_base_command(task_config) == expected + +def test_prepare_base_command_some_flags(): + """Test prepare_base_command with a subset of flags enabled.""" + task_config = { + "stats": False, + "json_output": True, + "decompress": False, + "invert-match": True, + "pattern": "another_pattern" + } + expected = [ + "ugrep", + "--json", + "--invert-match", + "another_pattern", + "--" + ] + assert prepare_base_command(task_config) == expected From 4a9c84d6ba8c9b573b3d62a46d0bdc3054b4d9c2 Mon Sep 17 00:00:00 2001 From: spop3000 Date: Tue, 2 Jun 2026 10:42:46 +0000 Subject: [PATCH 2/2] fix PYTHONPATH so tests can run --- workers/grep/pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/workers/grep/pyproject.toml b/workers/grep/pyproject.toml index bead065..c3be915 100644 --- a/workers/grep/pyproject.toml +++ b/workers/grep/pyproject.toml @@ -18,6 +18,9 @@ test = ["pytest", "pytest-cov>=5.0.0,<6", "pytest-mock>=3.14.0,<4"] package = false default-groups = ["test"] +[tool.pytest.ini_options] +pythonpath = ["."] + [build-system] requires = ["hatchling"] build-backend = "hatchling.build"