diff --git a/README.md b/README.md index a33a8b6..7711fce 100644 --- a/README.md +++ b/README.md @@ -215,7 +215,8 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE] [--until DURATION] [--loki-limit N] [--loki-user USER] [--loki-password PASSWORD] [--loki-token TOKEN] [--loki-tls-ca PATH] - [--loki-org-id ORG_ID] + [--loki-org-id ORG_ID] [--loki-threads N] + [--loki-chunk DURATION] [--loki-timeout SECONDS] [-v] [-V] [flows_file] ``` @@ -241,12 +242,15 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE] | `--loki-query LOGQL` | LogQL stream selector (default: `{container="cilium-agent"}`) | | `--since DURATION` | How far back to query, e.g. `30m`, `2h`, `1d` (default: `1h`) | | `--until DURATION` | End of query window as duration before now (default: `0s` = now) | -| `--loki-limit N` | Max entries per Loki request batch (default: `1000`) | +| `--loki-limit N` | Max entries per Loki request batch (default: `5000`) | | `--loki-user USER` | Username for Loki HTTP Basic authentication | | `--loki-password PASSWORD` | Password for Loki HTTP Basic authentication (used with `--loki-user`) | | `--loki-token TOKEN` | Bearer token for Loki (`Authorization: Bearer ...`) header | | `--loki-tls-ca PATH` | Path to a PEM CA certificate for verifying the Loki server (self-signed certs) | | `--loki-org-id ORG_ID` | Tenant ID sent as `X-Scope-OrgID` header (required when Loki `auth_enabled=true`) | +| `--loki-threads N` | Number of parallel worker threads for Loki queries (default: `8`) | +| `--loki-chunk DURATION` | Max time window per Loki request, e.g. `5s`, `30s`, `1m` (default: `5m` with `-n`, `5s` otherwise) | +| `--loki-timeout SECONDS` | HTTP request timeout in seconds for each Loki request (default: `30`) | | `-v, --verbose` | Enable verbose logging | | `-V, --version` | Show version and exit | diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index 85bbf57..b789a7a 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -8,7 +8,7 @@ from __future__ import annotations -__version__ = "0.11.0" +__version__ = "0.12.0" __author__ = "noexecstack" __license__ = "Apache-2.0" @@ -33,7 +33,7 @@ import urllib.parse import urllib.request from collections import Counter, defaultdict, deque -from collections.abc import Generator, Iterator +from collections.abc import Generator, Iterable, Iterator from contextlib import contextmanager from typing import IO, Any, ClassVar, cast @@ -59,6 +59,15 @@ AppPods, ] +# Result from _read_flows_loki(). +class LokiResult: + __slots__ = ("flows", "partial") + + def __init__(self, flows: list[tuple[int, dict[str, Any]]], partial: bool) -> None: + self.flows = flows + self.partial = partial + + # Exit codes EXIT_OK = 0 EXIT_ERROR = 1 @@ -426,6 +435,26 @@ def _build_loki_ssl_context( return ctx +def _loki_enrich_query(query: str, namespaces: Iterable[str]) -> str: + r"""Append LogQL line filters for *namespaces* to *query*. + + When the user passes ``-n argocd``, we inject a filter that matches + the JSON field pattern ``"namespace":"argocd"`` so Loki filters lines + server-side. This avoids false positives from pod names or labels + that happen to contain the namespace string. + + Multiple namespaces are OR-joined into a single regex filter, e.g. + ``"namespace":"(argocd|monitoring)"``. + """ + ns_list = sorted(set(namespaces)) + if not ns_list: + return query + if len(ns_list) == 1: + return query + r' |= "\"namespace\":\"' + ns_list[0] + r'\""' + pattern = "|".join(re.escape(ns) for ns in ns_list) + return query + r' |~ "\"namespace\":\"(' + pattern + r')\""' + + def _loki_fetch_chunk( base_url: str, query: str, @@ -434,6 +463,7 @@ def _loki_fetch_chunk( limit: int, headers: dict[str, str], ssl_ctx: ssl.SSLContext | None, + timeout: int = 30, ) -> list[tuple[int, dict[str, Any]]]: """Fetch log entries from Loki for a single time segment. @@ -457,7 +487,7 @@ def _loki_fetch_chunk( req = urllib.request.Request(url, headers=headers) try: - with urllib.request.urlopen(req, timeout=120, context=ssl_ctx) as resp: + with urllib.request.urlopen(req, timeout=timeout, context=ssl_ctx) as resp: body = json.loads(resp.read().decode("utf-8")) except (urllib.error.URLError, OSError) as exc: LOG.error("Failed to query Loki: %s", exc) @@ -507,13 +537,19 @@ def _read_flows_loki( loki_token: str | None = None, loki_tls_ca: str | None = None, loki_org_id: str | None = None, - threads: int = 4, -) -> Iterator[tuple[int, dict[str, Any]]]: - """Yield *(lineno, flow_dict)* by querying a Loki instance. + threads: int = 8, + chunk_seconds: float = 5, + timeout: int = 30, +) -> LokiResult: + """Fetch flows from a Loki instance. + + Returns a `LokiResult` with *(lineno, flow_dict)* pairs and a + *partial* flag indicating whether the fetch was interrupted. - The query time range is split into *threads* equal segments and - fetched in parallel using a thread pool, then merged in timestamp - order with monotonically increasing line numbers. + The query time range is split into small time chunks (controlled by + *chunk_seconds*) and fetched in parallel using a thread pool of + *threads* workers, then merged in timestamp order with monotonically + increasing line numbers. Parameters ---------- @@ -536,7 +572,13 @@ def _read_flows_loki( loki_tls_ca: Path to a PEM CA certificate for TLS verification. threads: - Number of parallel time segments (default: 4). + Number of parallel worker threads (default: 4). + chunk_seconds: + Maximum time window per Loki request in seconds (default: 30). + Smaller chunks reduce per-request load and are less likely to + time out. + timeout: + HTTP request timeout in seconds (default: 30). """ now = time.time() start_ns = int((now - since_seconds) * 1_000_000_000) @@ -556,35 +598,62 @@ def _read_flows_loki( ssl_ctx = _build_loki_ssl_context(loki_tls_ca) - # Determine how many parallel segments to use. Avoid creating more - # segments than there are seconds in the range. + # Split the time range into small chunks. Each chunk covers at most + # *chunk_seconds* of wall time so individual Loki requests stay fast. span = end_ns - start_ns - num_segments = max(1, min(threads, span // 1_000_000_000)) + chunk_ns = max(int(chunk_seconds * 1_000_000_000), 1_000_000_000) # floor 1s + num_chunks = max(1, -(-span // chunk_ns)) # ceiling division - if num_segments <= 1: - # Fast path: single segment, no thread pool overhead. - entries = _loki_fetch_chunk(base, query, start_ns, end_ns, limit, headers, ssl_ctx) - else: - boundaries = [start_ns + span * i // num_segments for i in range(num_segments)] - boundaries.append(end_ns) - segments = [(boundaries[i], boundaries[i + 1]) for i in range(num_segments)] + boundaries = [start_ns + span * i // num_chunks for i in range(num_chunks)] + boundaries.append(end_ns) + segments = [(boundaries[i], boundaries[i + 1]) for i in range(num_chunks)] + + interrupted = False + if len(segments) <= 1: + # Fast path: single chunk, no thread pool overhead. + try: + entries = _loki_fetch_chunk( + base, query, start_ns, end_ns, limit, headers, ssl_ctx, timeout + ) + except KeyboardInterrupt: + interrupted = True + entries = [] + else: + num_workers = min(threads, len(segments)) all_entries: list[tuple[int, dict[str, Any]]] = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=num_segments) as pool: + done = 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as pool: futures = [ - pool.submit(_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx) + pool.submit( + _loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout + ) for s, e in segments ] - for future in concurrent.futures.as_completed(futures): - all_entries.extend(future.result()) + try: + for future in concurrent.futures.as_completed(futures): + all_entries.extend(future.result()) + done += 1 + pct = done * 100 // len(segments) + print( + f"\r Fetched chunk {done}/{len(segments)} ({pct}%) " + f"-- {len(all_entries)} flows so far ...", + end="", + file=sys.stderr, + ) + except KeyboardInterrupt: + interrupted = True + for f in futures: + f.cancel() + print(file=sys.stderr) # newline after progress entries = all_entries entries.sort(key=lambda x: x[0]) LOG.debug("Fetched %d log entries from Loki", len(entries)) - for lineno, (_, flow) in enumerate(entries, 1): - yield lineno, flow + flows = [(lineno, flow) for lineno, (_, flow) in enumerate(entries, 1)] + return LokiResult(flows=flows, partial=interrupted) def parse_flows( @@ -1474,8 +1543,10 @@ def _draw_loki_header( since: str, until: str, flow_count: int, + load_seconds: float, is_selecting: bool, selected_count: int, + partial: bool = False, ) -> None: """Render the fixed header rows (0-2) for Loki watch mode TUI.""" # Row 0: source + timestamp @@ -1491,8 +1562,12 @@ def _draw_loki_header( # Row 1: time range + flow count + status badge prefix = f"Range: since={since} until={until} | Flows: {flow_count} " - badge = "* Loaded" - badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL + if partial: + badge = f"* Partial results (interrupted) -- {load_seconds:.1f}s" + badge_attr = curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL + else: + badge = f"* Loaded in {load_seconds:.1f}s" + badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL try: stdscr.addnstr(1, 0, prefix, width - 1) stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr) @@ -1591,17 +1666,20 @@ def _loki_watch_mode(args: argparse.Namespace) -> None: since_sec = _parse_duration(args.since) until_sec = _parse_duration(args.until) + loki_query = _loki_enrich_query(args.loki_query, namespaces) + chunk_default = "5m" if namespaces else "5s" + chunk_sec = _parse_duration(args.loki_chunk or chunk_default) print( f"Querying Loki at {args.loki_url} " - f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...", + f"(query={loki_query!r}, since={args.since}, until={args.until}) ...", file=sys.stderr, ) - loki_flows: list[dict[str, Any]] = [] - for _, flow in _read_flows_loki( + t0 = time.monotonic() + result = _read_flows_loki( args.loki_url, - args.loki_query, + loki_query, since_sec, until_sec, args.loki_limit, @@ -1611,13 +1689,30 @@ def _loki_watch_mode(args: argparse.Namespace) -> None: loki_tls_ca=args.loki_tls_ca, loki_org_id=args.loki_org_id, threads=args.loki_threads, - ): - loki_flows.append(flow) + chunk_seconds=chunk_sec, + timeout=args.loki_timeout, + ) + load_elapsed = time.monotonic() - t0 + partial = result.partial + loki_flows = [flow for _, flow in result.flows] - print(f"Loaded {len(loki_flows)} flows from Loki.", file=sys.stderr) + if partial: + print( + f"Interrupted -- continuing with {len(loki_flows)} flows " + f"fetched in {load_elapsed:.1f}s.", + file=sys.stderr, + ) + else: + print( + f"Loaded {len(loki_flows)} flows from Loki in {load_elapsed:.1f}s.", + file=sys.stderr, + ) if not loki_flows: - LOG.warning("No flows returned from Loki query") + if partial: + print("No flows fetched before interrupt.", file=sys.stderr) + else: + LOG.warning("No flows returned from Loki query") sys.exit(EXIT_NO_POLICIES) # Header layout matches live watch mode. @@ -1746,7 +1841,9 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] since=args.since, until=args.until, flow_count=len(loki_flows), + load_seconds=load_elapsed, is_selecting=is_selecting, + partial=partial, selected_count=len(selected_keys), ) _draw_content( @@ -2340,9 +2437,23 @@ def _build_parser() -> argparse.ArgumentParser: loki_group.add_argument( "--loki-threads", type=int, - default=4, + default=8, metavar="N", - help="Number of parallel time segments for Loki queries (default: 4)", + help="Number of parallel worker threads for Loki queries (default: 8)", + ) + loki_group.add_argument( + "--loki-chunk", + default=None, + metavar="DURATION", + help="Max time window per Loki request, e.g. 5s, 30s, 1m " + "(default: 5m when -n/--namespace filters are active, 5s otherwise)", + ) + loki_group.add_argument( + "--loki-timeout", + type=int, + default=30, + metavar="SECONDS", + help="HTTP request timeout in seconds for each Loki request (default: 30)", ) parser.add_argument( @@ -2391,9 +2502,17 @@ def main() -> None: parser.error("--loki-password requires --loki-user") since_sec = _parse_duration(args.since) until_sec = _parse_duration(args.until) - loki_iter = _read_flows_loki( + loki_query = _loki_enrich_query(args.loki_query, namespaces) + chunk_default = "5m" if namespaces else "5s" + chunk_sec = _parse_duration(args.loki_chunk or chunk_default) + print( + f"Querying Loki at {args.loki_url} " + f"(query={loki_query!r}, since={args.since}, until={args.until}) ...", + file=sys.stderr, + ) + loki_result = _read_flows_loki( args.loki_url, - args.loki_query, + loki_query, since_sec, until_sec, args.loki_limit, @@ -2403,14 +2522,17 @@ def main() -> None: loki_tls_ca=args.loki_tls_ca, loki_org_id=args.loki_org_id, threads=args.loki_threads, + chunk_seconds=chunk_sec, + timeout=args.loki_timeout, ) - print( - f"Querying Loki at {args.loki_url} " - f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...", - file=sys.stderr, - ) + if loki_result.partial: + print( + f"Interrupted -- generating output from {len(loki_result.flows)} " + f"flows fetched so far.", + file=sys.stderr, + ) policies, flow_counts, total, matched, app_pods = parse_flows( - "", label_keys, verdicts, namespaces, flow_iter=loki_iter + "", label_keys, verdicts, namespaces, flow_iter=iter(loki_result.flows) ) else: if not args.flows_file: diff --git a/pyproject.toml b/pyproject.toml index 0a6c9f8..131f45a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hubble-audit2policy" -version = "0.11.0" +version = "0.12.0" description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs." readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_flow_parsing.py b/tests/test_flow_parsing.py index 6841c3c..c09fb02 100644 --- a/tests/test_flow_parsing.py +++ b/tests/test_flow_parsing.py @@ -290,9 +290,10 @@ def test_basic_fetch(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 2 assert result[0][1]["l4"]["TCP"]["destination_port"] == 80 assert result[1][1]["l4"]["TCP"]["destination_port"] == 443 @@ -307,17 +308,19 @@ def test_error_response(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 0 def test_connection_error(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.side_effect = OSError("connection refused") - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 0 def test_malformed_json_skipped(self) -> None: @@ -340,9 +343,10 @@ def test_malformed_json_skipped(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 2 def test_pagination(self) -> None: @@ -399,11 +403,10 @@ def test_pagination(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.side_effect = responses # limit=1 forces pagination after each entry. - result = list( - h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, limit=1, threads=1 - ) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, limit=1, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 2 assert result[0][1]["l4"]["TCP"]["destination_port"] == 80 assert result[1][1]["l4"]["TCP"]["destination_port"] == 443 @@ -441,9 +444,10 @@ def test_multiple_streams(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 2 def test_query_params_forwarded(self) -> None: @@ -462,10 +466,9 @@ def test_query_params_forwarded(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - list( - h._read_flows_loki( - "http://loki:3100", '{namespace="hubble"}', 7200, 0, limit=100, threads=1 - ) + h._read_flows_loki( + "http://loki:3100", '{namespace="hubble"}', 7200, 0, limit=100, + threads=1, chunk_seconds=14400, ) req = mock_open.call_args[0][0] url = req.full_url @@ -499,9 +502,10 @@ def test_empty_log_lines_skipped(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ).flows assert len(result) == 1 def test_parallel_merges_segments(self) -> None: @@ -537,9 +541,10 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001 mid_ns = (start_ns + end_ns) // 2 with mock.patch("hubble_audit2policy.urllib.request.urlopen", side_effect=_fake_urlopen): - result = list( - h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=2) - ) + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=2, chunk_seconds=1800, + ).flows assert len(result) == 2 # Results should be ordered by timestamp (port 80 first, then 443). assert result[0][1]["l4"]["TCP"]["destination_port"] == 80 @@ -548,6 +553,37 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001 assert result[0][0] == 1 assert result[1][0] == 2 + def test_chunk_seconds_splits_into_many_requests(self) -> None: + """Small chunk_seconds creates more Loki requests than threads.""" + flow = _make_flow(port=80) + + def _fake_urlopen(req, **kwargs): # noqa: ARG001 + body = { + "status": "success", + "data": { + "result": [ + {"stream": {}, "values": [["1000000000", json.dumps(flow)]]} + ] + }, + } + resp = mock.MagicMock() + resp.read.return_value = json.dumps(body).encode() + resp.__enter__ = mock.Mock(return_value=resp) + resp.__exit__ = mock.Mock(return_value=False) + return resp + + with mock.patch( + "hubble_audit2policy.urllib.request.urlopen", side_effect=_fake_urlopen + ) as mock_open: + # 1h range with 600s chunks = 6 chunks, but only 2 worker threads. + result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=2, chunk_seconds=600, + ).flows + # 6 chunks, each returning 1 flow. + assert len(result) == 6 + assert mock_open.call_count == 6 + class TestLokiEndToEnd: """End-to-end: Loki response -> parse_flows -> policies.""" @@ -572,9 +608,12 @@ def test_loki_flows_produce_policies(self) -> None: mock_resp.__exit__ = mock.Mock(return_value=False) mock_open.return_value = mock_resp - loki_iter = h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1) + loki_result = h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ) policies, _, total, matched, _ = h.parse_flows( - "", LABEL_KEYS, set(), set(), flow_iter=loki_iter + "", LABEL_KEYS, set(), set(), flow_iter=iter(loki_result.flows) ) assert total == 2 assert matched == 2 @@ -602,15 +641,13 @@ def _mock_urlopen(self) -> mock.MagicMock: def test_bearer_token(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.return_value = self._mock_urlopen() - list( - h._read_flows_loki( - "http://loki:3100", - '{app="hubble"}', - 3600, - 0, - loki_token="my-secret-token", - threads=1, - ) + h._read_flows_loki( + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + loki_token="my-secret-token", + threads=1, chunk_seconds=7200, ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer my-secret-token" @@ -618,16 +655,14 @@ def test_bearer_token(self) -> None: def test_basic_auth(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.return_value = self._mock_urlopen() - list( - h._read_flows_loki( - "http://loki:3100", - '{app="hubble"}', - 3600, - 0, - loki_user="admin", - loki_password="s3cret", - threads=1, - ) + h._read_flows_loki( + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + loki_user="admin", + loki_password="s3cret", + threads=1, chunk_seconds=7200, ) req = mock_open.call_args[0][0] expected = "Basic " + base64.b64encode(b"admin:s3cret").decode("ascii") @@ -636,15 +671,13 @@ def test_basic_auth(self) -> None: def test_basic_auth_no_password(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.return_value = self._mock_urlopen() - list( - h._read_flows_loki( - "http://loki:3100", - '{app="hubble"}', - 3600, - 0, - loki_user="admin", - threads=1, - ) + h._read_flows_loki( + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + loki_user="admin", + threads=1, chunk_seconds=7200, ) req = mock_open.call_args[0][0] expected = "Basic " + base64.b64encode(b"admin:").decode("ascii") @@ -653,7 +686,10 @@ def test_basic_auth_no_password(self) -> None: def test_no_auth_header_by_default(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.return_value = self._mock_urlopen() - list(h._read_flows_loki("http://loki:3100", '{app="hubble"}', 3600, 0, threads=1)) + h._read_flows_loki( + "http://loki:3100", '{app="hubble"}', 3600, 0, + threads=1, chunk_seconds=7200, + ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") is None @@ -663,15 +699,13 @@ def test_tls_ca_cert(self) -> None: mock.patch("hubble_audit2policy.ssl.create_default_context") as mock_ctx, ): mock_open.return_value = self._mock_urlopen() - list( - h._read_flows_loki( - "https://loki:3100", - '{app="hubble"}', - 3600, - 0, - loki_tls_ca="/path/to/ca.pem", - threads=1, - ) + h._read_flows_loki( + "https://loki:3100", + '{app="hubble"}', + 3600, + 0, + loki_tls_ca="/path/to/ca.pem", + threads=1, chunk_seconds=7200, ) mock_ctx.assert_called_once_with(cafile="/path/to/ca.pem") # The context should be passed to urlopen. @@ -685,21 +719,39 @@ def test_tls_ca_with_bearer(self) -> None: mock.patch("hubble_audit2policy.ssl.create_default_context") as mock_ctx, ): mock_open.return_value = self._mock_urlopen() - list( - h._read_flows_loki( - "https://loki:3100", - '{app="hubble"}', - 3600, - 0, - loki_token="tok", - loki_tls_ca="/path/to/ca.pem", - threads=1, - ) + h._read_flows_loki( + "https://loki:3100", + '{app="hubble"}', + 3600, + 0, + loki_token="tok", + loki_tls_ca="/path/to/ca.pem", + threads=1, chunk_seconds=7200, ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer tok" mock_ctx.assert_called_once_with(cafile="/path/to/ca.pem") + def test_enrich_query_no_namespaces(self) -> None: + q = '{container="cilium-agent"}' + assert h._loki_enrich_query(q, []) == q + assert h._loki_enrich_query(q, set()) == q + + def test_enrich_query_single_namespace(self) -> None: + q = '{container="cilium-agent"}' + result = h._loki_enrich_query(q, ["argocd"]) + assert result == r'{container="cilium-agent"} |= "\"namespace\":\"argocd\""' + + def test_enrich_query_multiple_namespaces(self) -> None: + q = '{container="cilium-agent"}' + result = h._loki_enrich_query(q, ["monitoring", "argocd"]) + assert result == r'{container="cilium-agent"} |~ "\"namespace\":\"(argocd|monitoring)\""' + + def test_enrich_query_deduplicates(self) -> None: + q = '{container="cilium-agent"}' + result = h._loki_enrich_query(q, ["argocd", "argocd"]) + assert result == r'{container="cilium-agent"} |= "\"namespace\":\"argocd\""' + def test_build_loki_ssl_context_none(self) -> None: assert h._build_loki_ssl_context(None) is None assert h._build_loki_ssl_context("") is None