Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 189 additions & 142 deletions testing/backend/unit/test_ratelimit.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,200 @@
"""
Unit tests for backend.secuscan.ratelimit RateLimiter and ConcurrentTaskLimiter.

Covers:
- RateLimiter.can_execute allows requests within the hourly quota
- RateLimiter.can_execute blocks when hourly quota is exhausted
- RateLimiter.can_execute tracks per-client-per-plugin independently
- RateLimiter.can_execute expires old entries after 1 hour (sliding window)
- RateLimiter.reset clears a specific plugin or all buckets
- ConcurrentTaskLimiter.acquire succeeds while under the concurrent limit
- ConcurrentTaskLimiter.acquire fails when the concurrent limit is reached
- ConcurrentTaskLimiter.release removes a task from the running list
- ConcurrentTaskLimiter.get_available_slots returns correct available count
Unit tests for RateLimiter and WorkflowRateLimiter helpers in
backend.secuscan.ratelimit.

Covers (separately from test_endpoint_rate_limiter.py which covers
EndpointRateLimiter and resolve_client_identity):
- RateLimiter.can_execute: quota enforcement, cleanup, independent buckets
- RateLimiter.reset: per-plugin and global reset
- WorkflowRateLimiter.check_workflow_rate_limit: interval enforcement
"""

import asyncio
from datetime import datetime, timedelta

import pytest

from backend.secuscan.ratelimit import RateLimiter, ConcurrentTaskLimiter
from backend.secuscan.ratelimit import RateLimiter, WorkflowRateLimiter


# ---------------------------------------------------------------------------
# RateLimiter.can_execute
# ---------------------------------------------------------------------------

@pytest.mark.asyncio
async def test_can_execute_allows_under_quota():
"""When below max_per_hour, can_execute returns (True, '')."""
limiter = RateLimiter()
allowed, msg = await limiter.can_execute("plugin1", max_per_hour=10, client_id="alice")
assert allowed is True
assert msg == ""


@pytest.mark.asyncio
class TestRateLimiter:
async def test_allows_within_quota(self):
"""can_execute returns (True, '') for requests under the hourly limit."""
limiter = RateLimiter()
allowed, msg = await limiter.can_execute("nmap", max_per_hour=5, client_id="c1")
assert allowed is True
assert msg == ""

async def test_blocks_when_quota_exhausted(self):
"""can_execute returns (False, error) once the hourly quota is reached."""
limiter = RateLimiter()
# Exhaust the quota
for _ in range(3):
await limiter.can_execute("nmap", max_per_hour=3, client_id="c1")

allowed, msg = await limiter.can_execute("nmap", max_per_hour=3, client_id="c1")
assert allowed is False
assert "exceeded" in msg.lower()
assert "3/3" in msg

async def test_per_client_per_plugin_independently(self):
"""Different client_ids or plugin_ids have independent quotas."""
limiter = RateLimiter()
# Exhaust c1:nmap quota
for _ in range(2):
await limiter.can_execute("nmap", max_per_hour=2, client_id="c1")

# c2 should still be able to use nmap
allowed, _ = await limiter.can_execute("nmap", max_per_hour=2, client_id="c2")
assert allowed is True

# c1:dirbuster should still have quota
allowed, _ = await limiter.can_execute("dirbuster", max_per_hour=2, client_id="c1")
assert allowed is True

async def test_default_client_id_is_global(self):
"""When client_id is not provided, uses 'global' as the bucket key."""
limiter = RateLimiter()
# Exhaust the global quota for nmap
for _ in range(2):
await limiter.can_execute("nmap", max_per_hour=2)

allowed, _ = await limiter.can_execute("nmap", max_per_hour=2)
assert allowed is False

async def test_reset_plugin_clears_only_that_plugin(self):
"""reset(plugin_id) removes only buckets ending with :<plugin_id>."""
limiter = RateLimiter()
await limiter.can_execute("nmap", max_per_hour=1, client_id="c1")
await limiter.can_execute("dirbuster", max_per_hour=1, client_id="c1")

await limiter.reset("nmap")

# nmap bucket should be cleared
allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c1")
assert allowed is True
# dirbuster bucket should be untouched
allowed, _ = await limiter.can_execute("dirbuster", max_per_hour=1, client_id="c1")
assert allowed is False

async def test_reset_all_clears_all_buckets(self):
"""reset(None) clears every bucket."""
limiter = RateLimiter()
await limiter.can_execute("nmap", max_per_hour=1, client_id="c1")
await limiter.can_execute("nmap", max_per_hour=1, client_id="c2")

await limiter.reset()

allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c1")
assert allowed is True
allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c2")
assert allowed is True
async def test_can_execute_denies_at_quota():
"""When at max_per_hour, can_execute returns (False, error_message)."""
limiter = RateLimiter()
for _ in range(5):
await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice")

allowed, msg = await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice")
assert allowed is False
assert "Rate limit exceeded" in msg
assert "5/5" in msg


@pytest.mark.asyncio
async def test_can_execute_cleans_old_entries():
"""Entries older than 1 hour are removed so they no longer count toward quota."""
limiter = RateLimiter()

# Manually inject an old entry
bucket = "alice:plugin1"
old_time = datetime.now() - timedelta(hours=2)
limiter.task_history[bucket].append(old_time)

# Should allow since the old entry was cleaned up
allowed, msg = await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice")
assert allowed is True
assert len(limiter.task_history[bucket]) == 1 # old entry removed, new one added


@pytest.mark.asyncio
async def test_can_execute_independent_per_client():
"""Each client_id has an independent quota."""
limiter = RateLimiter()

for _ in range(3):
await limiter.can_execute("plugin1", max_per_hour=3, client_id="alice")

# Alice is at quota
allowed_alice, _ = await limiter.can_execute("plugin1", max_per_hour=3, client_id="alice")
assert allowed_alice is False

# Bob has separate quota
allowed_bob, _ = await limiter.can_execute("plugin1", max_per_hour=3, client_id="bob")
assert allowed_bob is True


@pytest.mark.asyncio
async def test_can_execute_independent_per_plugin():
"""Each plugin_id has an independent quota within the same client."""
limiter = RateLimiter()

for _ in range(3):
await limiter.can_execute("nmap", max_per_hour=3, client_id="alice")

# nmap is at quota
allowed_nmap, _ = await limiter.can_execute("nmap", max_per_hour=3, client_id="alice")
assert allowed_nmap is False

# http_inspector is independent
allowed_http, _ = await limiter.can_execute("http_inspector", max_per_hour=3, client_id="alice")
assert allowed_http is True


@pytest.mark.asyncio
async def test_can_execute_default_client_id_is_global():
"""When client_id is not supplied, defaults to 'global' bucket."""
limiter = RateLimiter()

for _ in range(3):
await limiter.can_execute("nmap", max_per_hour=3)

allowed, msg = await limiter.can_execute("nmap", max_per_hour=3)
assert allowed is False
# The bucket should be "global:nmap"
assert "global:nmap" in limiter.task_history


# ---------------------------------------------------------------------------
# RateLimiter.reset
# ---------------------------------------------------------------------------

@pytest.mark.asyncio
async def test_reset_clears_matching_plugin():
"""reset(plugin_id) clears all buckets for that plugin across all clients."""
limiter = RateLimiter()

await limiter.can_execute("nmap", max_per_hour=10, client_id="alice")
await limiter.can_execute("nmap", max_per_hour=10, client_id="bob")
await limiter.can_execute("http_inspector", max_per_hour=10, client_id="alice")

assert len(limiter.task_history) > 0

await limiter.reset("nmap")

# nmap buckets should be cleared
assert len(limiter.task_history["alice:nmap"]) == 0
assert len(limiter.task_history["bob:nmap"]) == 0
# http_inspector bucket should be untouched
assert len(limiter.task_history["alice:http_inspector"]) > 0


@pytest.mark.asyncio
async def test_reset_clears_all_buckets():
"""reset() with no argument clears every bucket."""
limiter = RateLimiter()

await limiter.can_execute("nmap", max_per_hour=10, client_id="alice")
await limiter.can_execute("nmap", max_per_hour=10, client_id="bob")

assert len(limiter.task_history) > 0

await limiter.reset()

assert len(limiter.task_history) == 0


# ---------------------------------------------------------------------------
# WorkflowRateLimiter.check_workflow_rate_limit
# ---------------------------------------------------------------------------

@pytest.mark.asyncio
async def test_workflow_rate_limiter_allows_first_run():
"""A workflow with no prior run should always be allowed."""
limiter = WorkflowRateLimiter()
allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300)
assert allowed is True
assert msg == ""


@pytest.mark.asyncio
async def test_workflow_rate_limiter_denies_within_interval():
"""A workflow run within the interval should be denied with a remaining-seconds message."""
limiter = WorkflowRateLimiter()

# First run
await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300)

# Second run immediately after
allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300)
assert allowed is False
assert "Workflow rate limited" in msg
# remaining should be close to 300
assert "300" in msg or "299" in msg


@pytest.mark.asyncio
async def test_workflow_rate_limiter_allows_after_interval():
"""A workflow run after the interval has passed should be allowed."""
limiter = WorkflowRateLimiter()

# Manually inject a past run
limiter._last_run["wf-1"] = datetime.now() - timedelta(seconds=301)

allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300)
assert allowed is True
assert msg == ""


@pytest.mark.asyncio
class TestConcurrentTaskLimiter:
async def test_acquire_succeeds_under_limit(self):
"""acquire returns (True, '') when slots are available."""
limiter = ConcurrentTaskLimiter(max_concurrent=3)
acquired, msg = await limiter.acquire("task-1")
assert acquired is True
assert msg == ""

async def test_acquire_fails_at_limit(self):
"""acquire returns (False, error) when all slots are occupied."""
limiter = ConcurrentTaskLimiter(max_concurrent=2)
await limiter.acquire("task-1")
await limiter.acquire("task-2")

acquired, msg = await limiter.acquire("task-3")
assert acquired is False
assert "Maximum concurrent tasks" in msg

async def test_release_frees_a_slot(self):
"""release(task_id) removes the task and frees its slot."""
limiter = ConcurrentTaskLimiter(max_concurrent=2)
await limiter.acquire("task-1")
await limiter.acquire("task-2")
# At limit, next acquire should fail
acquired, _ = await limiter.acquire("task-3")
assert acquired is False

# Release task-1, should free a slot
await limiter.release("task-1")
acquired, _ = await limiter.acquire("task-3")
assert acquired is True

async def test_release_unknown_task_is_noop(self):
"""release(task_id) where task_id is not in running_tasks is a no-op."""
limiter = ConcurrentTaskLimiter(max_concurrent=2)
await limiter.acquire("task-1")
# Releasing a non-existent task should not affect state
await limiter.release("nonexistent-task")
# Still at capacity
acquired, _ = await limiter.acquire("task-2")
assert acquired is True
acquired, _ = await limiter.acquire("task-3")
assert acquired is False

async def test_get_available_slots(self):
"""get_available_slots returns max_concurrent minus running task count."""
limiter = ConcurrentTaskLimiter(max_concurrent=3)
assert await limiter.get_available_slots() == 3

await limiter.acquire("task-1")
assert await limiter.get_available_slots() == 2

await limiter.acquire("task-2")
assert await limiter.get_available_slots() == 1

await limiter.release("task-1")
assert await limiter.get_available_slots() == 2
async def test_workflow_rate_limiter_each_workflow_independent():
"""Workflow rate limits are independent per workflow_id."""
limiter = WorkflowRateLimiter()

# Run wf-1
await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300)
# wf-2 has never run
allowed, msg = await limiter.check_workflow_rate_limit("wf-2", min_interval_seconds=300)
assert allowed is True
Loading