Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions hubble_audit2policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
AppPods,
]


# Result from _read_flows_loki().
class LokiResult:
__slots__ = ("flows", "partial")
Expand Down Expand Up @@ -625,9 +626,7 @@ def _read_flows_loki(
done = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as pool:
futures = [
pool.submit(
_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout
)
pool.submit(_loki_fetch_chunk, base, query, s, e, limit, headers, ssl_ctx, timeout)
for s, e in segments
]
try:
Expand Down
119 changes: 85 additions & 34 deletions tests/test_flow_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,12 @@ def test_basic_fetch(self) -> None:
mock_open.return_value = mock_resp

result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 2
assert result[0][1]["l4"]["TCP"]["destination_port"] == 80
Expand All @@ -309,17 +313,25 @@ def test_error_response(self) -> None:
mock_open.return_value = mock_resp

result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 0

def test_connection_error(self) -> None:
with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open:
mock_open.side_effect = OSError("connection refused")
result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 0

Expand All @@ -344,8 +356,12 @@ def test_malformed_json_skipped(self) -> None:
mock_open.return_value = mock_resp

result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 2

Expand Down Expand Up @@ -404,8 +420,13 @@ def test_pagination(self) -> None:
mock_open.side_effect = responses
# limit=1 forces pagination after each entry.
result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0, limit=1,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
limit=1,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 2
assert result[0][1]["l4"]["TCP"]["destination_port"] == 80
Expand Down Expand Up @@ -445,8 +466,12 @@ def test_multiple_streams(self) -> None:
mock_open.return_value = mock_resp

result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 2

Expand All @@ -467,8 +492,13 @@ def test_query_params_forwarded(self) -> None:
mock_open.return_value = mock_resp

h._read_flows_loki(
"http://loki:3100", '{namespace="hubble"}', 7200, 0, limit=100,
threads=1, chunk_seconds=14400,
"http://loki:3100",
'{namespace="hubble"}',
7200,
0,
limit=100,
threads=1,
chunk_seconds=14400,
)
req = mock_open.call_args[0][0]
url = req.full_url
Expand Down Expand Up @@ -503,8 +533,12 @@ def test_empty_log_lines_skipped(self) -> None:
mock_open.return_value = mock_resp

result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
).flows
assert len(result) == 1

Expand Down Expand Up @@ -542,8 +576,12 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001

with mock.patch("hubble_audit2policy.urllib.request.urlopen", side_effect=_fake_urlopen):
result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=2, chunk_seconds=1800,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=2,
chunk_seconds=1800,
).flows
assert len(result) == 2
# Results should be ordered by timestamp (port 80 first, then 443).
Expand All @@ -560,11 +598,7 @@ def test_chunk_seconds_splits_into_many_requests(self) -> None:
def _fake_urlopen(req, **kwargs): # noqa: ARG001
body = {
"status": "success",
"data": {
"result": [
{"stream": {}, "values": [["1000000000", json.dumps(flow)]]}
]
},
"data": {"result": [{"stream": {}, "values": [["1000000000", json.dumps(flow)]]}]},
}
resp = mock.MagicMock()
resp.read.return_value = json.dumps(body).encode()
Expand All @@ -577,8 +611,12 @@ def _fake_urlopen(req, **kwargs): # noqa: ARG001
) as mock_open:
# 1h range with 600s chunks = 6 chunks, but only 2 worker threads.
result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=2, chunk_seconds=600,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=2,
chunk_seconds=600,
).flows
# 6 chunks, each returning 1 flow.
assert len(result) == 6
Expand Down Expand Up @@ -609,8 +647,12 @@ def test_loki_flows_produce_policies(self) -> None:
mock_open.return_value = mock_resp

loki_result = h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
)
policies, _, total, matched, _ = h.parse_flows(
"", LABEL_KEYS, set(), set(), flow_iter=iter(loki_result.flows)
Expand Down Expand Up @@ -647,7 +689,8 @@ def test_bearer_token(self) -> None:
3600,
0,
loki_token="my-secret-token",
threads=1, chunk_seconds=7200,
threads=1,
chunk_seconds=7200,
)
req = mock_open.call_args[0][0]
assert req.get_header("Authorization") == "Bearer my-secret-token"
Expand All @@ -662,7 +705,8 @@ def test_basic_auth(self) -> None:
0,
loki_user="admin",
loki_password="s3cret",
threads=1, chunk_seconds=7200,
threads=1,
chunk_seconds=7200,
)
req = mock_open.call_args[0][0]
expected = "Basic " + base64.b64encode(b"admin:s3cret").decode("ascii")
Expand All @@ -677,7 +721,8 @@ def test_basic_auth_no_password(self) -> None:
3600,
0,
loki_user="admin",
threads=1, chunk_seconds=7200,
threads=1,
chunk_seconds=7200,
)
req = mock_open.call_args[0][0]
expected = "Basic " + base64.b64encode(b"admin:").decode("ascii")
Expand All @@ -687,8 +732,12 @@ def test_no_auth_header_by_default(self) -> None:
with mock.patch("hubble_audit2policy.urllib.request.urlopen") as mock_open:
mock_open.return_value = self._mock_urlopen()
h._read_flows_loki(
"http://loki:3100", '{app="hubble"}', 3600, 0,
threads=1, chunk_seconds=7200,
"http://loki:3100",
'{app="hubble"}',
3600,
0,
threads=1,
chunk_seconds=7200,
)
req = mock_open.call_args[0][0]
assert req.get_header("Authorization") is None
Expand All @@ -705,7 +754,8 @@ def test_tls_ca_cert(self) -> None:
3600,
0,
loki_tls_ca="/path/to/ca.pem",
threads=1, chunk_seconds=7200,
threads=1,
chunk_seconds=7200,
)
mock_ctx.assert_called_once_with(cafile="/path/to/ca.pem")
# The context should be passed to urlopen.
Expand All @@ -726,7 +776,8 @@ def test_tls_ca_with_bearer(self) -> None:
0,
loki_token="tok",
loki_tls_ca="/path/to/ca.pem",
threads=1, chunk_seconds=7200,
threads=1,
chunk_seconds=7200,
)
req = mock_open.call_args[0][0]
assert req.get_header("Authorization") == "Bearer tok"
Expand Down
Loading