Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions workers/openrelik-worker-timesketch/src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,34 @@
POLL_INTERVAL_SECONDS = 5


def _find_sketch_by_name(timesketch_api_client, sketch_name: str):
Comment thread
julianghill marked this conversation as resolved.
"""Return the first visible Timesketch sketch matching the given name."""
for sketch in timesketch_api_client.list_sketches():
if sketch.name == sketch_name:
return sketch
return None


def get_or_create_sketch(
timesketch_api_client,
redis_client,
sketch_id: int | str | None = None,
sketch_name: str | None = None,
reuse_existing_sketch: bool = False,
workflow_id: str | None = None,
):
"""
Retrieves or creates a sketch, handling locking if needed.
This uses Redis distributed lock to avoid race conditions.

Args:
client: Timesketch API client.
timesketch_api_client: Timesketch API client.
redis_client: Redis client.
sketch_id: ID of the sketch to retrieve.
sketch_name: Name of the sketch to create.
sketch_name: Name of the sketch to create. If reuse_existing_sketch is
enabled, an existing sketch with this name is used before creating one.
reuse_existing_sketch: Reuse an existing sketch with sketch_name instead
of always creating a new named sketch.
workflow_id: ID of the workflow.

Returns:
Expand All @@ -81,7 +93,17 @@ def get_or_create_sketch(
except ValueError:
raise ValueError(f"Sketch ID must be a number. Received: '{sketch_id}'")
elif sketch_name:
sketch = timesketch_api_client.create_sketch(sketch_name)
if reuse_existing_sketch:
with redis_client.lock(
f"timesketch-sketch-name:{sketch_name}",
timeout=60,
blocking_timeout=5,
):
sketch = _find_sketch_by_name(timesketch_api_client, sketch_name)
if not sketch:
sketch = timesketch_api_client.create_sketch(sketch_name)
else:
sketch = timesketch_api_client.create_sketch(sketch_name)
else:
sketch_name = f"openrelik-workflow-{workflow_id}"
# Prevent multiple distributed workers from concurrently creating the same
Expand All @@ -90,10 +112,7 @@ def get_or_create_sketch(
# The lock automatically expires after 60 seconds to prevent deadlocks.
with redis_client.lock(sketch_name, timeout=60, blocking_timeout=5):
# Search for an existing sketch while having the lock
for _sketch in timesketch_api_client.list_sketches():
if _sketch.name == sketch_name:
sketch = _sketch
break
sketch = _find_sketch_by_name(timesketch_api_client, sketch_name)

# If not found, create a new one
if not sketch:
Expand All @@ -119,11 +138,24 @@ def get_or_create_sketch(
},
{
"name": "sketch_name",
"label": "Name of the new sketch to create",
"description": "Create a new sketch",
"label": "Name of the sketch",
"description": (
"By default, create a new sketch with this name. Enable reuse "
"by name to use the first existing sketch with this name instead."
),
"type": "text",
"required": False,
},
{
"name": "reuse_existing_sketch",
"label": "Reuse existing sketch by name",
"description": (
"When enabled, use the first existing sketch with the configured "
"name. If none exists, create it."
),
"type": "checkbox",
"required": False,
},
{
"name": "timeline_name",
"label": "Name of the timeline to create",
Expand Down Expand Up @@ -221,6 +253,11 @@ def upload(
# User supplied config.
sketch_id = task_config.get("sketch_id")
sketch_name = task_config.get("sketch_name")
reuse_existing_sketch = task_config.get("reuse_existing_sketch", False)
if isinstance(reuse_existing_sketch, str):
reuse_existing_sketch = reuse_existing_sketch.lower() in ["true", "1", "yes"]
else:
reuse_existing_sketch = bool(reuse_existing_sketch)

# Analyzers config
selected_analyzers = task_config.get("analyzers", [])
Expand Down Expand Up @@ -266,6 +303,7 @@ def upload(
redis_client,
sketch_id=sketch_id,
sketch_name=sketch_name,
reuse_existing_sketch=reuse_existing_sketch,
workflow_id=workflow_id,
)

Expand Down
158 changes: 150 additions & 8 deletions workers/openrelik-worker-timesketch/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,158 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests tasks."""
"""Tests for Timesketch worker tasks."""

# Note: Use pytest for writing tests!
import pytest
from unittest.mock import MagicMock, patch

# from src.tasks import command
with patch("redis.Redis.from_url"):
from src.tasks import get_or_create_sketch, upload


def test_task_command():
"""Test command task."""
class Sketch:
"""Minimal Timesketch sketch object for tests."""

ret = "some dummy return value"
assert isinstance(ret, str)
def __init__(self, name: str):
self.name = name


def _mock_redis():
redis_client = MagicMock()
redis_client.lock.return_value = MagicMock()
return redis_client


def test_get_or_create_sketch_creates_named_sketch_by_default():
"""Test sketch_name keeps the default behavior of creating a new sketch."""
sketch = Sketch("Case-123")
timesketch_api_client = MagicMock()
timesketch_api_client.create_sketch.return_value = sketch
redis_client = _mock_redis()

result = get_or_create_sketch(
timesketch_api_client,
redis_client,
sketch_name="Case-123",
)

assert result == sketch
timesketch_api_client.create_sketch.assert_called_once_with("Case-123")
timesketch_api_client.list_sketches.assert_not_called()
redis_client.lock.assert_not_called()


def test_get_or_create_sketch_reuses_existing_named_sketch_when_enabled():
"""Test opt-in name reuse returns an existing Timesketch sketch."""
sketch = Sketch("Case-123")
timesketch_api_client = MagicMock()
timesketch_api_client.list_sketches.return_value = [sketch]
redis_client = _mock_redis()

result = get_or_create_sketch(
timesketch_api_client,
redis_client,
sketch_name="Case-123",
reuse_existing_sketch=True,
)

assert result == sketch
redis_client.lock.assert_called_once_with(
"timesketch-sketch-name:Case-123",
timeout=60,
blocking_timeout=5,
)
timesketch_api_client.create_sketch.assert_not_called()


def test_get_or_create_sketch_creates_named_sketch_when_reuse_finds_none():
"""Test opt-in name reuse creates the sketch when no match exists."""
sketch = Sketch("Case-123")
timesketch_api_client = MagicMock()
timesketch_api_client.list_sketches.return_value = []
timesketch_api_client.create_sketch.return_value = sketch
redis_client = _mock_redis()

result = get_or_create_sketch(
timesketch_api_client,
redis_client,
sketch_name="Case-123",
reuse_existing_sketch=True,
)

assert result == sketch
redis_client.lock.assert_called_once_with(
"timesketch-sketch-name:Case-123",
timeout=60,
blocking_timeout=5,
)
timesketch_api_client.create_sketch.assert_called_once_with("Case-123")


def test_get_or_create_sketch_uses_id_before_name_reuse():
"""Test sketch_id remains the explicit lookup when multiple options are set."""
sketch = Sketch("Case-123")
timesketch_api_client = MagicMock()
timesketch_api_client.get_sketch.return_value = sketch
redis_client = _mock_redis()

result = get_or_create_sketch(
timesketch_api_client,
redis_client,
sketch_id="123",
sketch_name="Case-123",
reuse_existing_sketch=True,
)

assert result == sketch
timesketch_api_client.get_sketch.assert_called_once_with(123)
timesketch_api_client.list_sketches.assert_not_called()
timesketch_api_client.create_sketch.assert_not_called()
redis_client.lock.assert_not_called()


def test_upload_passes_reuse_existing_sketch_config():
"""Test upload parses and passes the name reuse checkbox config."""
sketch = MagicMock()
sketch.id = 123

timeline = MagicMock()
timeline.id = 456
timeline.name = "Host-A"

streamer = MagicMock()
streamer.timeline = timeline
streamer_context = MagicMock()
streamer_context.__enter__.return_value = streamer

with (
patch.dict(
"os.environ",
{
"TIMESKETCH_SERVER_URL": "http://timesketch-web:5000",
"TIMESKETCH_SERVER_PUBLIC_URL": "http://127.0.0.1:5000",
"TIMESKETCH_USERNAME": "admin",
"TIMESKETCH_PASSWORD": "password",
},
),
patch(
"src.tasks.get_input_files",
return_value=[{"path": "/tmp/host-a.plaso", "display_name": "host-a.plaso"}],
),
patch("src.tasks.timesketch_client.TimesketchApi"),
patch("src.tasks.get_or_create_sketch", return_value=sketch) as get_sketch,
patch("src.tasks.importer.ImportStreamer", return_value=streamer_context),
patch("src.tasks.create_task_result", return_value="task-result"),
patch.object(upload, "send_event"),
):
result = upload.run(
input_files=[],
workflow_id="workflow-1",
task_config={
"sketch_name": "Case-123",
"reuse_existing_sketch": "true",
"timeline_name": "Host-A",
},
)

assert result == "task-result"
assert get_sketch.call_args.kwargs["reuse_existing_sketch"] is True
Loading