diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index b789a7a..6b06973 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -59,6 +59,7 @@ AppPods, ] + # Result from _read_flows_loki(). class LokiResult: __slots__ = ("flows", "partial") @@ -625,9 +626,7 @@ def _read_flows_loki( done = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as pool: futures = [ - pool.submit( - _loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout - ) + pool.submit(_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout) for s, e in segments ] try: diff --git a/tests/test_flow_parsing.py b/tests/test_flow_parsing.py index c09fb02..d537ccf 100644 --- a/tests/test_flow_parsing.py +++ b/tests/test_flow_parsing.py @@ -291,8 +291,12 @@ def test_basic_fetch(self) -> None: mock_open.return_value = mock_resp result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 2 assert result[0][1]["l4"]["TCP"]["destination_port"] == 80 @@ -309,8 +313,12 @@ def test_error_response(self) -> None: mock_open.return_value = mock_resp result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 0 @@ -318,8 +326,12 @@ def test_connection_error(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.side_effect = OSError("connection refused") result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 0 @@ -344,8 +356,12 @@ def test_malformed_json_skipped(self) -> None: mock_open.return_value = mock_resp result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 2 @@ -404,8 +420,13 @@ def test_pagination(self) -> None: mock_open.side_effect = responses # limit=1 forces pagination after each entry. result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, limit=1, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + limit=1, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 2 assert result[0][1]["l4"]["TCP"]["destination_port"] == 80 @@ -445,8 +466,12 @@ def test_multiple_streams(self) -> None: mock_open.return_value = mock_resp result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 2 @@ -467,8 +492,13 @@ def test_query_params_forwarded(self) -> None: mock_open.return_value = mock_resp h._read_flows_loki( - "http://loki:3100", '{namespace="hubble"}', 7200, 0, limit=100, - threads=1, chunk_seconds=14400, + "http://loki:3100", + '{namespace="hubble"}', + 7200, + 0, + limit=100, + threads=1, + chunk_seconds=14400, ) req = mock_open.call_args[0][0] url = req.full_url @@ -503,8 +533,12 @@ def test_empty_log_lines_skipped(self) -> None: mock_open.return_value = mock_resp result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ).flows assert len(result) == 1 @@ -542,8 +576,12 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001 with mock.patch("hubble_audit2policy.urllib.request.urlopen", side_effect=_fake_urlopen): result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=2, chunk_seconds=1800, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=2, + chunk_seconds=1800, ).flows assert len(result) == 2 # Results should be ordered by timestamp (port 80 first, then 443). @@ -560,11 +598,7 @@ def test_chunk_seconds_splits_into_many_requests(self) -> None: def _fake_urlopen(req, **kwargs): # noqa: ARG001 body = { "status": "success", - "data": { - "result": [ - {"stream": {}, "values": [["1000000000", json.dumps(flow)]]} - ] - }, + "data": {"result": [{"stream": {}, "values": [["1000000000", json.dumps(flow)]]}]}, } resp = mock.MagicMock() resp.read.return_value = json.dumps(body).encode() @@ -577,8 +611,12 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001 ) as mock_open: # 1h range with 600s chunks = 6 chunks, but only 2 worker threads. result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=2, chunk_seconds=600, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=2, + chunk_seconds=600, ).flows # 6 chunks, each returning 1 flow. assert len(result) == 6 @@ -609,8 +647,12 @@ def test_loki_flows_produce_policies(self) -> None: mock_open.return_value = mock_resp loki_result = h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ) policies, _, total, matched, _ = h.parse_flows( "", LABEL_KEYS, set(), set(), flow_iter=iter(loki_result.flows) @@ -647,7 +689,8 @@ def test_bearer_token(self) -> None: 3600, 0, loki_token="my-secret-token", - threads=1, chunk_seconds=7200, + threads=1, + chunk_seconds=7200, ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer my-secret-token" @@ -662,7 +705,8 @@ def test_basic_auth(self) -> None: 0, loki_user="admin", loki_password="s3cret", - threads=1, chunk_seconds=7200, + threads=1, + chunk_seconds=7200, ) req = mock_open.call_args[0][0] expected = "Basic " + base64.b64encode(b"admin:s3cret").decode("ascii") @@ -677,7 +721,8 @@ def test_basic_auth_no_password(self) -> None: 3600, 0, loki_user="admin", - threads=1, chunk_seconds=7200, + threads=1, + chunk_seconds=7200, ) req = mock_open.call_args[0][0] expected = "Basic " + base64.b64encode(b"admin:").decode("ascii") @@ -687,8 +732,12 @@ def test_no_auth_header_by_default(self) -> None: with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open: mock_open.return_value = self._mock_urlopen() h._read_flows_loki( - "http://loki:3100", '{app="hubble"}', 3600, 0, - threads=1, chunk_seconds=7200, + "http://loki:3100", + '{app="hubble"}', + 3600, + 0, + threads=1, + chunk_seconds=7200, ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") is None @@ -705,7 +754,8 @@ def test_tls_ca_cert(self) -> None: 3600, 0, loki_tls_ca="/path/to/ca.pem", - threads=1, chunk_seconds=7200, + threads=1, + chunk_seconds=7200, ) mock_ctx.assert_called_once_with(cafile="/path/to/ca.pem") # The context should be passed to urlopen. @@ -726,7 +776,8 @@ def test_tls_ca_with_bearer(self) -> None: 0, loki_token="tok", loki_tls_ca="/path/to/ca.pem", - threads=1, chunk_seconds=7200, + threads=1, + chunk_seconds=7200, ) req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer tok"