From 34e15a3116abaa7c35027885532bf3d9192f279f Mon Sep 17 00:00:00 2001 From: tmdeveloper007 Date: Tue, 23 Jun 2026 07:42:28 +0000 Subject: [PATCH] test : add unit tests for RateLimiter and WorkflowRateLimiter helpers --- testing/backend/unit/test_ratelimit.py | 331 ++++++++++++++----------- 1 file changed, 189 insertions(+), 142 deletions(-) diff --git a/testing/backend/unit/test_ratelimit.py b/testing/backend/unit/test_ratelimit.py index 6a73f46f..04ca1a0a 100644 --- a/testing/backend/unit/test_ratelimit.py +++ b/testing/backend/unit/test_ratelimit.py @@ -1,153 +1,200 @@ """ -Unit tests for backend.secuscan.ratelimit RateLimiter and ConcurrentTaskLimiter. - -Covers: -- RateLimiter.can_execute allows requests within the hourly quota -- RateLimiter.can_execute blocks when hourly quota is exhausted -- RateLimiter.can_execute tracks per-client-per-plugin independently -- RateLimiter.can_execute expires old entries after 1 hour (sliding window) -- RateLimiter.reset clears a specific plugin or all buckets -- ConcurrentTaskLimiter.acquire succeeds while under the concurrent limit -- ConcurrentTaskLimiter.acquire fails when the concurrent limit is reached -- ConcurrentTaskLimiter.release removes a task from the running list -- ConcurrentTaskLimiter.get_available_slots returns correct available count +Unit tests for RateLimiter and WorkflowRateLimiter helpers in +backend.secuscan.ratelimit. + +Covers (separately from test_endpoint_rate_limiter.py which covers +EndpointRateLimiter and resolve_client_identity): +- RateLimiter.can_execute: quota enforcement, cleanup, independent buckets +- RateLimiter.reset: per-plugin and global reset +- WorkflowRateLimiter.check_workflow_rate_limit: interval enforcement """ +import asyncio +from datetime import datetime, timedelta + import pytest -from backend.secuscan.ratelimit import RateLimiter, ConcurrentTaskLimiter +from backend.secuscan.ratelimit import RateLimiter, WorkflowRateLimiter + + +# --------------------------------------------------------------------------- +# RateLimiter.can_execute +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_can_execute_allows_under_quota(): + """When below max_per_hour, can_execute returns (True, '').""" + limiter = RateLimiter() + allowed, msg = await limiter.can_execute("plugin1", max_per_hour=10, client_id="alice") + assert allowed is True + assert msg == "" @pytest.mark.asyncio -class TestRateLimiter: - async def test_allows_within_quota(self): - """can_execute returns (True, '') for requests under the hourly limit.""" - limiter = RateLimiter() - allowed, msg = await limiter.can_execute("nmap", max_per_hour=5, client_id="c1") - assert allowed is True - assert msg == "" - - async def test_blocks_when_quota_exhausted(self): - """can_execute returns (False, error) once the hourly quota is reached.""" - limiter = RateLimiter() - # Exhaust the quota - for _ in range(3): - await limiter.can_execute("nmap", max_per_hour=3, client_id="c1") - - allowed, msg = await limiter.can_execute("nmap", max_per_hour=3, client_id="c1") - assert allowed is False - assert "exceeded" in msg.lower() - assert "3/3" in msg - - async def test_per_client_per_plugin_independently(self): - """Different client_ids or plugin_ids have independent quotas.""" - limiter = RateLimiter() - # Exhaust c1:nmap quota - for _ in range(2): - await limiter.can_execute("nmap", max_per_hour=2, client_id="c1") - - # c2 should still be able to use nmap - allowed, _ = await limiter.can_execute("nmap", max_per_hour=2, client_id="c2") - assert allowed is True - - # c1:dirbuster should still have quota - allowed, _ = await limiter.can_execute("dirbuster", max_per_hour=2, client_id="c1") - assert allowed is True - - async def test_default_client_id_is_global(self): - """When client_id is not provided, uses 'global' as the bucket key.""" - limiter = RateLimiter() - # Exhaust the global quota for nmap - for _ in range(2): - await limiter.can_execute("nmap", max_per_hour=2) - - allowed, _ = await limiter.can_execute("nmap", max_per_hour=2) - assert allowed is False - - async def test_reset_plugin_clears_only_that_plugin(self): - """reset(plugin_id) removes only buckets ending with :.""" - limiter = RateLimiter() - await limiter.can_execute("nmap", max_per_hour=1, client_id="c1") - await limiter.can_execute("dirbuster", max_per_hour=1, client_id="c1") - - await limiter.reset("nmap") - - # nmap bucket should be cleared - allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c1") - assert allowed is True - # dirbuster bucket should be untouched - allowed, _ = await limiter.can_execute("dirbuster", max_per_hour=1, client_id="c1") - assert allowed is False - - async def test_reset_all_clears_all_buckets(self): - """reset(None) clears every bucket.""" - limiter = RateLimiter() - await limiter.can_execute("nmap", max_per_hour=1, client_id="c1") - await limiter.can_execute("nmap", max_per_hour=1, client_id="c2") - - await limiter.reset() - - allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c1") - assert allowed is True - allowed, _ = await limiter.can_execute("nmap", max_per_hour=1, client_id="c2") - assert allowed is True +async def test_can_execute_denies_at_quota(): + """When at max_per_hour, can_execute returns (False, error_message).""" + limiter = RateLimiter() + for _ in range(5): + await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice") + + allowed, msg = await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice") + assert allowed is False + assert "Rate limit exceeded" in msg + assert "5/5" in msg + + +@pytest.mark.asyncio +async def test_can_execute_cleans_old_entries(): + """Entries older than 1 hour are removed so they no longer count toward quota.""" + limiter = RateLimiter() + + # Manually inject an old entry + bucket = "alice:plugin1" + old_time = datetime.now() - timedelta(hours=2) + limiter.task_history[bucket].append(old_time) + + # Should allow since the old entry was cleaned up + allowed, msg = await limiter.can_execute("plugin1", max_per_hour=5, client_id="alice") + assert allowed is True + assert len(limiter.task_history[bucket]) == 1 # old entry removed, new one added + + +@pytest.mark.asyncio +async def test_can_execute_independent_per_client(): + """Each client_id has an independent quota.""" + limiter = RateLimiter() + + for _ in range(3): + await limiter.can_execute("plugin1", max_per_hour=3, client_id="alice") + + # Alice is at quota + allowed_alice, _ = await limiter.can_execute("plugin1", max_per_hour=3, client_id="alice") + assert allowed_alice is False + + # Bob has separate quota + allowed_bob, _ = await limiter.can_execute("plugin1", max_per_hour=3, client_id="bob") + assert allowed_bob is True + + +@pytest.mark.asyncio +async def test_can_execute_independent_per_plugin(): + """Each plugin_id has an independent quota within the same client.""" + limiter = RateLimiter() + + for _ in range(3): + await limiter.can_execute("nmap", max_per_hour=3, client_id="alice") + + # nmap is at quota + allowed_nmap, _ = await limiter.can_execute("nmap", max_per_hour=3, client_id="alice") + assert allowed_nmap is False + + # http_inspector is independent + allowed_http, _ = await limiter.can_execute("http_inspector", max_per_hour=3, client_id="alice") + assert allowed_http is True + + +@pytest.mark.asyncio +async def test_can_execute_default_client_id_is_global(): + """When client_id is not supplied, defaults to 'global' bucket.""" + limiter = RateLimiter() + + for _ in range(3): + await limiter.can_execute("nmap", max_per_hour=3) + + allowed, msg = await limiter.can_execute("nmap", max_per_hour=3) + assert allowed is False + # The bucket should be "global:nmap" + assert "global:nmap" in limiter.task_history + + +# --------------------------------------------------------------------------- +# RateLimiter.reset +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_reset_clears_matching_plugin(): + """reset(plugin_id) clears all buckets for that plugin across all clients.""" + limiter = RateLimiter() + + await limiter.can_execute("nmap", max_per_hour=10, client_id="alice") + await limiter.can_execute("nmap", max_per_hour=10, client_id="bob") + await limiter.can_execute("http_inspector", max_per_hour=10, client_id="alice") + + assert len(limiter.task_history) > 0 + + await limiter.reset("nmap") + + # nmap buckets should be cleared + assert len(limiter.task_history["alice:nmap"]) == 0 + assert len(limiter.task_history["bob:nmap"]) == 0 + # http_inspector bucket should be untouched + assert len(limiter.task_history["alice:http_inspector"]) > 0 + + +@pytest.mark.asyncio +async def test_reset_clears_all_buckets(): + """reset() with no argument clears every bucket.""" + limiter = RateLimiter() + + await limiter.can_execute("nmap", max_per_hour=10, client_id="alice") + await limiter.can_execute("nmap", max_per_hour=10, client_id="bob") + + assert len(limiter.task_history) > 0 + + await limiter.reset() + + assert len(limiter.task_history) == 0 + + +# --------------------------------------------------------------------------- +# WorkflowRateLimiter.check_workflow_rate_limit +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_workflow_rate_limiter_allows_first_run(): + """A workflow with no prior run should always be allowed.""" + limiter = WorkflowRateLimiter() + allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300) + assert allowed is True + assert msg == "" + + +@pytest.mark.asyncio +async def test_workflow_rate_limiter_denies_within_interval(): + """A workflow run within the interval should be denied with a remaining-seconds message.""" + limiter = WorkflowRateLimiter() + + # First run + await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300) + + # Second run immediately after + allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300) + assert allowed is False + assert "Workflow rate limited" in msg + # remaining should be close to 300 + assert "300" in msg or "299" in msg + + +@pytest.mark.asyncio +async def test_workflow_rate_limiter_allows_after_interval(): + """A workflow run after the interval has passed should be allowed.""" + limiter = WorkflowRateLimiter() + + # Manually inject a past run + limiter._last_run["wf-1"] = datetime.now() - timedelta(seconds=301) + + allowed, msg = await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300) + assert allowed is True + assert msg == "" @pytest.mark.asyncio -class TestConcurrentTaskLimiter: - async def test_acquire_succeeds_under_limit(self): - """acquire returns (True, '') when slots are available.""" - limiter = ConcurrentTaskLimiter(max_concurrent=3) - acquired, msg = await limiter.acquire("task-1") - assert acquired is True - assert msg == "" - - async def test_acquire_fails_at_limit(self): - """acquire returns (False, error) when all slots are occupied.""" - limiter = ConcurrentTaskLimiter(max_concurrent=2) - await limiter.acquire("task-1") - await limiter.acquire("task-2") - - acquired, msg = await limiter.acquire("task-3") - assert acquired is False - assert "Maximum concurrent tasks" in msg - - async def test_release_frees_a_slot(self): - """release(task_id) removes the task and frees its slot.""" - limiter = ConcurrentTaskLimiter(max_concurrent=2) - await limiter.acquire("task-1") - await limiter.acquire("task-2") - # At limit, next acquire should fail - acquired, _ = await limiter.acquire("task-3") - assert acquired is False - - # Release task-1, should free a slot - await limiter.release("task-1") - acquired, _ = await limiter.acquire("task-3") - assert acquired is True - - async def test_release_unknown_task_is_noop(self): - """release(task_id) where task_id is not in running_tasks is a no-op.""" - limiter = ConcurrentTaskLimiter(max_concurrent=2) - await limiter.acquire("task-1") - # Releasing a non-existent task should not affect state - await limiter.release("nonexistent-task") - # Still at capacity - acquired, _ = await limiter.acquire("task-2") - assert acquired is True - acquired, _ = await limiter.acquire("task-3") - assert acquired is False - - async def test_get_available_slots(self): - """get_available_slots returns max_concurrent minus running task count.""" - limiter = ConcurrentTaskLimiter(max_concurrent=3) - assert await limiter.get_available_slots() == 3 - - await limiter.acquire("task-1") - assert await limiter.get_available_slots() == 2 - - await limiter.acquire("task-2") - assert await limiter.get_available_slots() == 1 - - await limiter.release("task-1") - assert await limiter.get_available_slots() == 2 +async def test_workflow_rate_limiter_each_workflow_independent(): + """Workflow rate limits are independent per workflow_id.""" + limiter = WorkflowRateLimiter() + + # Run wf-1 + await limiter.check_workflow_rate_limit("wf-1", min_interval_seconds=300) + # wf-2 has never run + allowed, msg = await limiter.check_workflow_rate_limit("wf-2", min_interval_seconds=300) + assert allowed is True