Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE]
[--until DURATION] [--loki-limit N]
[--loki-user USER] [--loki-password PASSWORD]
[--loki-token TOKEN] [--loki-tls-ca PATH]
[--loki-org-id ORG_ID]
[--loki-org-id ORG_ID] [--loki-threads N]
[--loki-chunk DURATION] [--loki-timeout SECONDS]
[-v] [-V]
[flows_file]
```
Expand All @@ -241,12 +242,15 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE]
| `--loki-query LOGQL` | LogQL stream selector (default: `{container="cilium-agent"}`) |
| `--since DURATION` | How far back to query, e.g. `30m`, `2h`, `1d` (default: `1h`) |
| `--until DURATION` | End of query window as duration before now (default: `0s` = now) |
| `--loki-limit N` | Max entries per Loki request batch (default: `1000`) |
| `--loki-limit N` | Max entries per Loki request batch (default: `5000`) |
| `--loki-user USER` | Username for Loki HTTP Basic authentication |
| `--loki-password PASSWORD` | Password for Loki HTTP Basic authentication (used with `--loki-user`) |
| `--loki-token TOKEN` | Bearer token for Loki (`Authorization: Bearer ...`) header |
| `--loki-tls-ca PATH` | Path to a PEM CA certificate for verifying the Loki server (self-signed certs) |
| `--loki-org-id ORG_ID` | Tenant ID sent as `X-Scope-OrgID` header (required when Loki `auth_enabled=true`) |
| `--loki-threads N` | Number of parallel worker threads for Loki queries (default: `8`) |
| `--loki-chunk DURATION` | Max time window per Loki request, e.g. `5s`, `30s`, `1m` (default: `5m` with `-n`, `5s` otherwise) |
| `--loki-timeout SECONDS` | HTTP request timeout in seconds for each Loki request (default: `30`) |
| `-v, --verbose` | Enable verbose logging |
| `-V, --version` | Show version and exit |

Expand Down
214 changes: 168 additions & 46 deletions hubble_audit2policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from __future__ import annotations

__version__ = "0.11.0"
__version__ = "0.12.0"
__author__ = "noexecstack"
__license__ = "Apache-2.0"

Expand All @@ -33,7 +33,7 @@
import urllib.parse
import urllib.request
from collections import Counter, defaultdict, deque
from collections.abc import Generator, Iterator
from collections.abc import Generator, Iterable, Iterator
from contextlib import contextmanager
from typing import IO, Any, ClassVar, cast

Expand All @@ -59,6 +59,15 @@
AppPods,
]

# Result from _read_flows_loki().
class LokiResult:
__slots__ = ("flows", "partial")

def __init__(self, flows: list[tuple[int, dict[str, Any]]], partial: bool) -> None:
self.flows = flows
self.partial = partial


# Exit codes
EXIT_OK = 0
EXIT_ERROR = 1
Expand Down Expand Up @@ -426,6 +435,26 @@ def _build_loki_ssl_context(
return ctx


def _loki_enrich_query(query: str, namespaces: Iterable[str]) -> str:
r"""Append LogQL line filters for *namespaces* to *query*.

When the user passes ``-n argocd``, we inject a filter that matches
the JSON field pattern ``"namespace":"argocd"`` so Loki filters lines
server-side. This avoids false positives from pod names or labels
that happen to contain the namespace string.

Multiple namespaces are OR-joined into a single regex filter, e.g.
``"namespace":"(argocd|monitoring)"``.
"""
ns_list = sorted(set(namespaces))
if not ns_list:
return query
if len(ns_list) == 1:
return query + r' |= "\"namespace\":\"' + ns_list[0] + r'\""'
pattern = "|".join(re.escape(ns) for ns in ns_list)
return query + r' |~ "\"namespace\":\"(' + pattern + r')\""'


def _loki_fetch_chunk(
base_url: str,
query: str,
Expand All @@ -434,6 +463,7 @@ def _loki_fetch_chunk(
limit: int,
headers: dict[str, str],
ssl_ctx: ssl.SSLContext | None,
timeout: int = 30,
) -> list[tuple[int, dict[str, Any]]]:
"""Fetch log entries from Loki for a single time segment.

Expand All @@ -457,7 +487,7 @@ def _loki_fetch_chunk(

req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=120, context=ssl_ctx) as resp:
with urllib.request.urlopen(req, timeout=timeout, context=ssl_ctx) as resp:
body = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, OSError) as exc:
LOG.error("Failed to query Loki: %s", exc)
Expand Down Expand Up @@ -507,13 +537,19 @@ def _read_flows_loki(
loki_token: str | None = None,
loki_tls_ca: str | None = None,
loki_org_id: str | None = None,
threads: int = 4,
) -> Iterator[tuple[int, dict[str, Any]]]:
"""Yield *(lineno, flow_dict)* by querying a Loki instance.
threads: int = 8,
chunk_seconds: float = 5,
timeout: int = 30,
) -> LokiResult:
"""Fetch flows from a Loki instance.

Returns a `LokiResult` with *(lineno, flow_dict)* pairs and a
*partial* flag indicating whether the fetch was interrupted.

The query time range is split into *threads* equal segments and
fetched in parallel using a thread pool, then merged in timestamp
order with monotonically increasing line numbers.
The query time range is split into small time chunks (controlled by
*chunk_seconds*) and fetched in parallel using a thread pool of
*threads* workers, then merged in timestamp order with monotonically
increasing line numbers.

Parameters
----------
Expand All @@ -536,7 +572,13 @@ def _read_flows_loki(
loki_tls_ca:
Path to a PEM CA certificate for TLS verification.
threads:
Number of parallel time segments (default: 4).
Number of parallel worker threads (default: 4).
chunk_seconds:
Maximum time window per Loki request in seconds (default: 30).
Smaller chunks reduce per-request load and are less likely to
time out.
timeout:
HTTP request timeout in seconds (default: 30).
"""
now = time.time()
start_ns = int((now - since_seconds) * 1_000_000_000)
Expand All @@ -556,35 +598,62 @@ def _read_flows_loki(

ssl_ctx = _build_loki_ssl_context(loki_tls_ca)

# Determine how many parallel segments to use. Avoid creating more
# segments than there are seconds in the range.
# Split the time range into small chunks. Each chunk covers at most
# *chunk_seconds* of wall time so individual Loki requests stay fast.
span = end_ns - start_ns
num_segments = max(1, min(threads, span // 1_000_000_000))
chunk_ns = max(int(chunk_seconds * 1_000_000_000), 1_000_000_000) # floor 1s
num_chunks = max(1, -(-span // chunk_ns)) # ceiling division

if num_segments <= 1:
# Fast path: single segment, no thread pool overhead.
entries = _loki_fetch_chunk(base, query, start_ns, end_ns, limit, headers, ssl_ctx)
else:
boundaries = [start_ns + span * i // num_segments for i in range(num_segments)]
boundaries.append(end_ns)
segments = [(boundaries[i], boundaries[i + 1]) for i in range(num_segments)]
boundaries = [start_ns + span * i // num_chunks for i in range(num_chunks)]
boundaries.append(end_ns)
segments = [(boundaries[i], boundaries[i + 1]) for i in range(num_chunks)]

interrupted = False

if len(segments) <= 1:
# Fast path: single chunk, no thread pool overhead.
try:
entries = _loki_fetch_chunk(
base, query, start_ns, end_ns, limit, headers, ssl_ctx, timeout
)
except KeyboardInterrupt:
interrupted = True
entries = []
else:
num_workers = min(threads, len(segments))
all_entries: list[tuple[int, dict[str, Any]]] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_segments) as pool:
done = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as pool:
futures = [
pool.submit(_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx)
pool.submit(
_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout
)
for s, e in segments
]
for future in concurrent.futures.as_completed(futures):
all_entries.extend(future.result())
try:
for future in concurrent.futures.as_completed(futures):
all_entries.extend(future.result())
done += 1
pct = done * 100 // len(segments)
print(
f"\r Fetched chunk {done}/{len(segments)} ({pct}%) "
f"-- {len(all_entries)} flows so far ...",
end="",
file=sys.stderr,
)
except KeyboardInterrupt:
interrupted = True
for f in futures:
f.cancel()
print(file=sys.stderr) # newline after progress

entries = all_entries

entries.sort(key=lambda x: x[0])
LOG.debug("Fetched %d log entries from Loki", len(entries))

for lineno, (_, flow) in enumerate(entries, 1):
yield lineno, flow
flows = [(lineno, flow) for lineno, (_, flow) in enumerate(entries, 1)]
return LokiResult(flows=flows, partial=interrupted)


def parse_flows(
Expand Down Expand Up @@ -1474,8 +1543,10 @@ def _draw_loki_header(
since: str,
until: str,
flow_count: int,
load_seconds: float,
is_selecting: bool,
selected_count: int,
partial: bool = False,
) -> None:
"""Render the fixed header rows (0-2) for Loki watch mode TUI."""
# Row 0: source + timestamp
Expand All @@ -1491,8 +1562,12 @@ def _draw_loki_header(

# Row 1: time range + flow count + status badge
prefix = f"Range: since={since} until={until} | Flows: {flow_count} "
badge = "* Loaded"
badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL
if partial:
badge = f"* Partial results (interrupted) -- {load_seconds:.1f}s"
badge_attr = curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL
else:
badge = f"* Loaded in {load_seconds:.1f}s"
badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL
try:
stdscr.addnstr(1, 0, prefix, width - 1)
stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr)
Expand Down Expand Up @@ -1591,17 +1666,20 @@ def _loki_watch_mode(args: argparse.Namespace) -> None:

since_sec = _parse_duration(args.since)
until_sec = _parse_duration(args.until)
loki_query = _loki_enrich_query(args.loki_query, namespaces)
chunk_default = "5m" if namespaces else "5s"
chunk_sec = _parse_duration(args.loki_chunk or chunk_default)

print(
f"Querying Loki at {args.loki_url} "
f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...",
f"(query={loki_query!r}, since={args.since}, until={args.until}) ...",
file=sys.stderr,
)

loki_flows: list[dict[str, Any]] = []
for _, flow in _read_flows_loki(
t0 = time.monotonic()
result = _read_flows_loki(
args.loki_url,
args.loki_query,
loki_query,
since_sec,
until_sec,
args.loki_limit,
Expand All @@ -1611,13 +1689,30 @@ def _loki_watch_mode(args: argparse.Namespace) -> None:
loki_tls_ca=args.loki_tls_ca,
loki_org_id=args.loki_org_id,
threads=args.loki_threads,
):
loki_flows.append(flow)
chunk_seconds=chunk_sec,
timeout=args.loki_timeout,
)
load_elapsed = time.monotonic() - t0
partial = result.partial
loki_flows = [flow for _, flow in result.flows]

print(f"Loaded {len(loki_flows)} flows from Loki.", file=sys.stderr)
if partial:
print(
f"Interrupted -- continuing with {len(loki_flows)} flows "
f"fetched in {load_elapsed:.1f}s.",
file=sys.stderr,
)
else:
print(
f"Loaded {len(loki_flows)} flows from Loki in {load_elapsed:.1f}s.",
file=sys.stderr,
)

if not loki_flows:
LOG.warning("No flows returned from Loki query")
if partial:
print("No flows fetched before interrupt.", file=sys.stderr)
else:
LOG.warning("No flows returned from Loki query")
sys.exit(EXIT_NO_POLICIES)

# Header layout matches live watch mode.
Expand Down Expand Up @@ -1746,7 +1841,9 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined]
since=args.since,
until=args.until,
flow_count=len(loki_flows),
load_seconds=load_elapsed,
is_selecting=is_selecting,
partial=partial,
selected_count=len(selected_keys),
)
_draw_content(
Expand Down Expand Up @@ -2340,9 +2437,23 @@ def _build_parser() -> argparse.ArgumentParser:
loki_group.add_argument(
"--loki-threads",
type=int,
default=4,
default=8,
metavar="N",
help="Number of parallel time segments for Loki queries (default: 4)",
help="Number of parallel worker threads for Loki queries (default: 8)",
)
loki_group.add_argument(
"--loki-chunk",
default=None,
metavar="DURATION",
help="Max time window per Loki request, e.g. 5s, 30s, 1m "
"(default: 5m when -n/--namespace filters are active, 5s otherwise)",
)
loki_group.add_argument(
"--loki-timeout",
type=int,
default=30,
metavar="SECONDS",
help="HTTP request timeout in seconds for each Loki request (default: 30)",
)

parser.add_argument(
Expand Down Expand Up @@ -2391,9 +2502,17 @@ def main() -> None:
parser.error("--loki-password requires --loki-user")
since_sec = _parse_duration(args.since)
until_sec = _parse_duration(args.until)
loki_iter = _read_flows_loki(
loki_query = _loki_enrich_query(args.loki_query, namespaces)
chunk_default = "5m" if namespaces else "5s"
chunk_sec = _parse_duration(args.loki_chunk or chunk_default)
print(
f"Querying Loki at {args.loki_url} "
f"(query={loki_query!r}, since={args.since}, until={args.until}) ...",
file=sys.stderr,
)
loki_result = _read_flows_loki(
args.loki_url,
args.loki_query,
loki_query,
since_sec,
until_sec,
args.loki_limit,
Expand All @@ -2403,14 +2522,17 @@ def main() -> None:
loki_tls_ca=args.loki_tls_ca,
loki_org_id=args.loki_org_id,
threads=args.loki_threads,
chunk_seconds=chunk_sec,
timeout=args.loki_timeout,
)
print(
f"Querying Loki at {args.loki_url} "
f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...",
file=sys.stderr,
)
if loki_result.partial:
print(
f"Interrupted -- generating output from {len(loki_result.flows)} "
f"flows fetched so far.",
file=sys.stderr,
)
policies, flow_counts, total, matched, app_pods = parse_flows(
"", label_keys, verdicts, namespaces, flow_iter=loki_iter
"", label_keys, verdicts, namespaces, flow_iter=iter(loki_result.flows)
)
else:
if not args.flows_file:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hubble-audit2policy"
version = "0.11.0"
version = "0.12.0"
description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs."
readme = "README.md"
license = "Apache-2.0"
Expand Down
Loading
Loading