-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_shared.py
More file actions
362 lines (305 loc) · 13.2 KB
/
_shared.py
File metadata and controls
362 lines (305 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# SPDX-FileCopyrightText: 2026 KustoKing / SecM8
# SPDX-License-Identifier: Apache-2.0
"""Shared helpers used by 2+ command modules in :mod:`contentops.cli.commands`.
Kept in one place so individual command modules can import the
helpers they need without circular dependencies.
"""
from __future__ import annotations
import logging
import os
import sys
from pathlib import Path
import click
import yaml
from contentops.core.discovery import discover_assets, load_asset
from contentops.core.handler import LoadedAsset
# ---------------------------------------------------------------------------
# Logging — quiet azure.identity / httpx by default, opt in with -v / -vv
# ---------------------------------------------------------------------------
#
# The legacy CLI calls logging.basicConfig(INFO) which floods the
# terminal with azure.identity probes (~3000 lines per `collect` run on
# a real tenant). Demote those loggers to WARNING by default; the
# top-level pipeline logger stays at INFO so per-asset progress is
# still visible. -v promotes the noisy loggers to INFO; -vv promotes
# them to DEBUG.
_NOISY_LOGGERS = ("azure.identity", "azure.core", "httpx", "urllib3", "msal")
def _apply_log_levels(verbosity: int = 0) -> None:
"""Set sane defaults for noisy loggers across CLI subcommands."""
if verbosity >= 2:
target = logging.DEBUG
elif verbosity >= 1:
target = logging.INFO
else:
target = logging.WARNING
for name in _NOISY_LOGGERS:
logging.getLogger(name).setLevel(target)
# ---------------------------------------------------------------------------
# Run banner — printed at the top of every collect / apply / drift run
# ---------------------------------------------------------------------------
def _print_run_banner(
command: str,
detections_path: Path | None = None,
*,
extra: dict[str, str] | None = None,
) -> None:
"""Print a tenant + scope banner before any API call.
The analyst sees what's about to happen (which subscription /
workspace / api version / output path) before we hit the API, so
accidentally targeting the wrong tenant is easy to abort.
"""
try:
from contentops.config import load_tenant_config
cfg = load_tenant_config()
except Exception:
cfg = None
env_name = (cfg.name if cfg else os.getenv("PIPELINE_ENV") or "(unset)")
click.echo(f"pipeline {command} — {env_name}")
if cfg is not None:
active_name = os.environ.get("PIPELINE_WORKSPACE_NAME")
if active_name:
try:
ws = cfg.workspace_by_name(active_name)
click.echo(f" subscription : {ws.subscriptionId}")
click.echo(f" resource_group : {ws.resourceGroup}")
click.echo(f" workspace : {ws.workspaceName} ({ws.role})")
except KeyError:
pass
elif cfg.sentinelWorkspaces:
click.echo(
f" workspaces : "
+ ", ".join(
f"{w.workspaceName}({w.role})" for w in cfg.sentinelWorkspaces
)
)
click.echo(
" api version : 2025-07-01-preview (ARM) / beta (Graph)"
)
if detections_path is not None:
click.echo(f" path : {detections_path}")
if extra:
for key, value in extra.items():
click.echo(f" {key:<14} : {value}")
click.echo("")
def _format_summary_table(
by_asset: dict[str, dict[str, int]],
*,
duration_s: float | None = None,
title: str = "Summary",
) -> list[str]:
"""Format the new/changed/in-sync/failed/duration table consistently."""
lines: list[str] = []
if duration_s is not None:
lines.append(f"\n{title} (duration {duration_s:.1f}s):")
else:
lines.append(f"\n{title}:")
header = (
f" {'asset':40s} {'new':>6s} {'changed':>8s} "
f"{'in-sync':>8s} {'failed':>7s}"
)
lines.append(header)
totals = {"new": 0, "changed": 0, "in-sync": 0, "failed": 0}
for asset_value in sorted(by_asset):
bucket = by_asset[asset_value]
n_new = bucket.get("new", 0)
n_changed = bucket.get("changed", 0)
n_in_sync = bucket.get("in-sync", 0)
n_failed = bucket.get("failed", 0)
totals["new"] += n_new
totals["changed"] += n_changed
totals["in-sync"] += n_in_sync
totals["failed"] += n_failed
lines.append(
f" {asset_value:40s} {n_new:>6d} {n_changed:>8d} "
f"{n_in_sync:>8d} {n_failed:>7d}"
)
lines.append(
f" {'TOTAL':40s} {totals['new']:>6d} {totals['changed']:>8d} "
f"{totals['in-sync']:>8d} {totals['failed']:>7d}"
)
return lines
def _load_all(detections_path: Path):
paths = discover_assets(detections_path)
loaded = []
for p in paths:
try:
loaded.append(load_asset(p))
except Exception as exc: # pragma: no cover — defensive
click.echo(f" load error: {p}: {exc}", err=True)
return loaded
def _filter_changed_since(loaded, ref: str):
"""Restrict ``loaded`` to assets whose YAML changed since ``ref``."""
from contentops.utils.git_diff import GitDiffError, changed_paths
try:
diff = changed_paths(ref)
except GitDiffError as exc:
raise click.ClickException(f"--changed-since={ref}: {exc}") from exc
return [la for la in loaded if la.path.resolve() in diff]
def _emit_dependency_report(loaded) -> bool:
"""Run dependency validation. Returns True if violations were found."""
from contentops.core.dependencies import (
load_graph as load_dependency_graph,
validate as validate_dependencies,
)
report = validate_dependencies(loaded, load_dependency_graph())
if report.violations:
click.echo(f"\nDependency check — {len(report.violations)} violation(s):")
for v in report.violations:
click.echo(v.as_row())
return bool(report.violations)
# ---------------------------------------------------------------------------
# Engine-disabled envelope filter (used by plan, apply)
# ---------------------------------------------------------------------------
# Asset kinds grouped by deployment engine. Used by ``_filter_disabled_engines``
# to decide which envelopes to skip when an engine is disabled in tenant.yml.
#
# Derived from the ``Asset`` enum so a future taxonomy addition / rename
# updates both groupings automatically. The enum-prefix convention
# (``sentinel_*`` / ``defender_*``) is the implicit contract; pinned by
# ``test_engine_asset_value_sets_partition_asset_enum`` in
# ``tests/v2/test_optional_engines.py``. Cross-phase review-2 Seam B.
from contentops.core.asset import Asset as _Asset # local import: avoid top-level cycle # noqa: E402
_SENTINEL_ASSET_VALUES = frozenset(
a.value for a in _Asset if a.value.startswith("sentinel_")
)
_DEFENDER_ASSET_VALUES = frozenset(
a.value for a in _Asset if a.value.startswith("defender_")
)
def _filter_disabled_engines(loaded: list[LoadedAsset]) -> list[LoadedAsset]:
"""Drop envelopes whose deployment engine is disabled in tenant.yml.
Mirrors the registration-time gating in
:func:`contentops.cli.handler_factories.register_default_handlers`:
skip Sentinel envelopes when ``sentinelWorkspaces`` is empty, skip
Defender envelopes when ``defender:`` is absent or ``enabled:
false``. Prints a single info line per skipped engine.
Only ``FileNotFoundError`` (no tenant.yml) is treated as "both
engines enabled" to preserve unit-test behaviour that bypasses
config loading entirely. A malformed config (Pydantic
``ValidationError``, ``ValueError``, ``KeyError``) propagates so a
real schema bug fails loud at filter-time instead of silently
leaving every envelope in place and surfacing later as an obscure
handler error.
"""
try:
from contentops.config import load_tenant_config
cfg = load_tenant_config()
sentinel_enabled = bool(cfg.sentinelWorkspaces)
defender_enabled = cfg.defender is not None and cfg.defender.enabled
except FileNotFoundError:
return loaded # no config -> assume both engines
if sentinel_enabled and defender_enabled:
return loaded # nothing to skip
skipped_sentinel: list[LoadedAsset] = []
skipped_defender: list[LoadedAsset] = []
kept: list[LoadedAsset] = []
for la in loaded:
v = la.envelope.asset.value
if not sentinel_enabled and v in _SENTINEL_ASSET_VALUES:
skipped_sentinel.append(la)
continue
if not defender_enabled and v in _DEFENDER_ASSET_VALUES:
skipped_defender.append(la)
continue
kept.append(la)
if skipped_sentinel:
click.echo(
f" no Sentinel workspaces configured — skipping "
f"{len(skipped_sentinel)} Sentinel envelope(s)"
)
if skipped_defender:
click.echo(
f" Defender disabled in tenant.yml — skipping "
f"{len(skipped_defender)} defender_custom_detection envelope(s)"
)
return kept
# ---------------------------------------------------------------------------
# Lock detection (used by apply, prune, rollback)
# ---------------------------------------------------------------------------
def _is_locked(loaded: LoadedAsset) -> bool:
"""True when the envelope on disk declares localCustomization=true.
Top-level ``localCustomization: true`` is the supported syntax.
The flag is intentionally kept off the strict envelope schema so
an analyst can lock a rule without a model migration.
"""
try:
raw = yaml.safe_load(loaded.path.read_text(encoding="utf-8"))
except Exception:
return False
if not isinstance(raw, dict):
return False
return raw.get("localCustomization") is True
# ---------------------------------------------------------------------------
# Single-workspace selector (used by prune, drift)
# ---------------------------------------------------------------------------
def _resolve_single_workspace_or_exit(
role: str | None, workspace_name: str | None,
) -> None:
"""Resolve ``--role`` / ``--workspace`` for single-workspace commands.
``prune`` and ``drift`` operate against one workspace per
invocation. This helper is **additive** — it only acts when the
operator has actually passed one of the flags. When neither flag
is set, we fall through silently and let the existing behaviour
(``PIPELINE_WORKSPACE_NAME`` env var, or the implicit
single-workspace pick inside the handler factories) apply. That
preserves backward compatibility with unit tests, with operators
setting the env var directly, and with single-workspace tenants
where the flag is redundant.
When a flag IS passed:
* ``--workspace foo`` → sets ``PIPELINE_WORKSPACE_NAME=foo``
after verifying ``foo`` exists in ``config/tenant.yml``.
* ``--role prod`` matches exactly one workspace → sets that
one's name.
* ``--role prod`` matches multiple → exit 2 with a "run once
per workspace" message. Iteration is supported by
``contentops apply`` because it's the write path; ``prune`` /
``drift`` would need to merge orphan/diff sets across
workspaces in non-obvious ways, so they punt to the operator.
* No matches → exit 2.
Sets ``PIPELINE_WORKSPACE_NAME`` before handler registration so
the factories in ``contentops/cli/handler_factories.py`` pick up the
correct ARM endpoint.
"""
if role is None and workspace_name is None:
return # additive — no flag passed, leave the existing behaviour
from contentops.config import load_tenant_config, select_workspaces
try:
cfg = load_tenant_config()
except FileNotFoundError:
click.echo(
"error: --role / --workspace require config/tenant.yml.",
err=True,
)
sys.exit(2)
# Defender-only tenant: no Sentinel workspaces are configured at
# all, so any --role / --workspace selector is meaningless. Treat
# as a no-op with an info message rather than a hard error so the
# caller can still operate on Defender content.
if not cfg.sentinelWorkspaces:
click.echo(
f"info: --role / --workspace ignored — no Sentinel workspaces "
f"in this tenant ({cfg.name!r}).",
)
return
try:
workspaces = select_workspaces(cfg, role=role, workspace=workspace_name)
except (ValueError, KeyError) as exc:
click.echo(f"error: {exc}", err=True)
sys.exit(2)
if not workspaces:
click.echo(
f"error: no Sentinel workspace matched "
f"(role={role!r}, workspace={workspace_name!r}).",
err=True,
)
sys.exit(2)
if len(workspaces) > 1:
click.echo(
f"error: --role={role!r} matches {len(workspaces)} workspaces "
f"({', '.join(w.workspaceName for w in workspaces)}). "
"This command targets one workspace per run; re-run with "
"--workspace <name> for each, or use `contentops apply` which "
"iterates the matched set.",
err=True,
)
sys.exit(2)
os.environ["PIPELINE_WORKSPACE_NAME"] = workspaces[0].workspaceName