From 9df5657bb3ef4e19dff69c4c91daf506d86ccd10 Mon Sep 17 00:00:00 2001 From: Thomas Christory Date: Wed, 13 May 2026 00:18:38 +0200 Subject: [PATCH 1/2] refactor(builder): simplify builder package - Collapse `is_collection` two-step logic to `not remainder` - Drop unused `is_collection_path` param from `_attach`; drop unused `schema_op`/`resource_name` params from `_classify` - Fold `synthesized` intermediate var into single `operation_id =` expression - Extract `is_object_like` to eliminate duplicate `top_level in (...)` test - Replace double `model_extra or {}` double-access with a single `extras` var in `_classify_top_level`, `_object_branch`, and `_collect_sensitive_paths` - Simplify `_collect_sensitive_paths` ref-cycle guard: track `schema.ref` only (the ref we entered with) - Replace `getattr(schema, "format", None)` with direct `schema.format` (typed field) No behaviour changes. All tests pass. Co-Authored-By: Claude Sonnet 4.6 --- nsc/builder/build.py | 79 +++++++++++++++----------------------------- 1 file changed, 26 insertions(+), 53 deletions(-) diff --git a/nsc/builder/build.py b/nsc/builder/build.py index 1b58975..a98b82a 100644 --- a/nsc/builder/build.py +++ b/nsc/builder/build.py @@ -114,7 +114,7 @@ def _assimilate( if tag_name is None: return # untagged operations are skipped intentionally - resource_name, is_collection_path = _resource_from_path(path, tag_name) + resource_name, _ = _resource_from_path(path, tag_name) if resource_name is None: return # paths that don't match the /api///... shape @@ -129,8 +129,7 @@ def _assimilate( tag.resources[resource_name] = resource op = _to_model_operation(path, http_method, schema_op, item, doc) - classification = _classify(http_method, path, schema_op, resource_name) - _attach(resource, classification, op, is_collection_path) + _attach(resource, _classify(http_method, path), op) def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]: @@ -168,11 +167,7 @@ def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]: if _PARAM_SEGMENT.match(resource): return None, False remainder = parts[anchor + 2 :] - is_collection = all(not _PARAM_SEGMENT.match(p) for p in remainder) and not remainder - # If the only remaining segment is `{id}` it's the per-item path: - if len(remainder) == 1 and _PARAM_SEGMENT.match(remainder[0]): - is_collection = False - return resource, is_collection + return resource, not remainder _CRUD_MAP: dict[tuple[HttpMethod, bool], str] = { @@ -185,30 +180,16 @@ def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]: } -def _classify( - method: HttpMethod, - path: str, - schema_op: SchemaOperation, - resource_name: str, -) -> str: +def _classify(method: HttpMethod, path: str) -> str: has_id = "{id}" in path - extra_path_segments = path.rstrip("/").split("/")[-1] - # Path is a custom action endpoint if `{id}` appears AND the last segment - # is a literal name (not itself a path parameter like `{id}`). - is_action_endpoint = has_id and not _PARAM_SEGMENT.match(extra_path_segments) - - if is_action_endpoint: + last_segment = path.rstrip("/").split("/")[-1] + # Custom action: `{id}` appears but the final segment is a literal name, not a parameter. + if has_id and not _PARAM_SEGMENT.match(last_segment): return "custom" - return _CRUD_MAP.get((method, has_id), "custom") -def _attach( - resource: _MutableResource, - classification: str, - op: Operation, - is_collection_path: bool, # kept for future heuristics -) -> None: +def _attach(resource: _MutableResource, classification: str, op: Operation) -> None: match classification: case "list": resource.list_op = op @@ -233,13 +214,11 @@ def _to_model_operation( item: PathItem, doc: OpenAPIDocument, ) -> Operation: - if schema_op.operation_id is None: - # Synthesize one — operationId is required in practice but we degrade - # gracefully rather than crashing on hand-rolled schemas. - synthesized = f"{http_method.value.lower()}_{path.strip('/').replace('/', '_')}" - operation_id = synthesized - else: - operation_id = schema_op.operation_id + # Synthesize an operationId when absent — required in practice but we degrade + # gracefully rather than crashing on hand-rolled schemas. + operation_id = schema_op.operation_id or ( + f"{http_method.value.lower()}_{path.strip('/').replace('/', '_')}" + ) seen: dict[str, Parameter] = {} for source_param in (*item.parameters, *schema_op.parameters): @@ -289,12 +268,13 @@ def _to_request_body_shape( top_level = _classify_top_level(schema, doc) if top_level is None: return None - if top_level in ("object", "object_or_array"): + is_object_like = top_level in ("object", "object_or_array") + if is_object_like: branch = _object_branch(schema, doc) required = list(branch.required or []) if branch is not None else [] else: required = [] - fields = _flat_fields(schema, doc) if top_level in ("object", "object_or_array") else {} + fields = _flat_fields(schema, doc) if is_object_like else {} raw_paths = _collect_sensitive_paths(schema, doc) sensitive_paths = tuple(sorted({".".join(p) for p in raw_paths if p})) return RequestBodyShape( @@ -326,9 +306,8 @@ def _classify_top_level( ) -> Literal["object", "array", "object_or_array"] | None: if schema.type in ("object", "array"): return schema.type # type: ignore[return-value] - one_of = schema.model_extra.get("oneOf") if schema.model_extra else None - any_of = schema.model_extra.get("anyOf") if schema.model_extra else None - candidates = one_of or any_of or None + extras = schema.model_extra or {} + candidates = extras.get("oneOf") or extras.get("anyOf") or None if not candidates: return None types = {t for raw in candidates if (t := _branch_type(raw, doc)) is not None} @@ -343,9 +322,8 @@ def _object_branch(schema: SchemaObject, doc: OpenAPIDocument) -> SchemaObject | """Find the object-shaped variant of a oneOf/anyOf, resolving $ref branches.""" if schema.type == "object": return schema - candidates = ( - (schema.model_extra or {}).get("oneOf") or (schema.model_extra or {}).get("anyOf") or [] - ) + extras = schema.model_extra or {} + candidates = extras.get("oneOf") or extras.get("anyOf") or [] for raw in candidates: if not isinstance(raw, dict): continue @@ -400,7 +378,7 @@ def _resolve_ref(schema: SchemaObject, doc: OpenAPIDocument) -> SchemaObject | N def _is_sensitive_field(name: str, schema: SchemaObject) -> bool: if name.lower() in _CANONICAL_SENSITIVE_NAMES: return True - return getattr(schema, "format", None) == "password" + return schema.format == "password" def _collect_sensitive_paths( @@ -416,15 +394,12 @@ def _collect_sensitive_paths( self-referential schemas (NetBox has a few via `tags`). """ target = _resolve_ref(schema, doc) or schema - new_seen = seen - if target.ref is not None: - if target.ref in seen: - return [] - new_seen = seen | {target.ref} - elif schema.ref is not None: + if schema.ref is not None: if schema.ref in seen: return [] new_seen = seen | {schema.ref} + else: + new_seen = seen paths: list[tuple[str, ...]] = [] @@ -442,10 +417,8 @@ def _collect_sensitive_paths( if target.type == "array" and target.items is not None: paths.extend(_collect_sensitive_paths(target.items, doc, prefix=prefix, seen=new_seen)) - candidates = ( - (target.model_extra or {}).get("oneOf") or (target.model_extra or {}).get("anyOf") or [] - ) - for raw in candidates: + target_extras = target.model_extra or {} + for raw in target_extras.get("oneOf") or target_extras.get("anyOf") or []: if isinstance(raw, dict): branch = SchemaObject.model_validate(raw) paths.extend(_collect_sensitive_paths(branch, doc, prefix=prefix, seen=new_seen)) From 5f8947d2a51ce8314ad7b174fd764133de88d354 Mon Sep 17 00:00:00 2001 From: Thomas Christory Date: Wed, 13 May 2026 08:01:15 +0200 Subject: [PATCH 2/2] refactor(writes): simplify cli writes pipeline - preflight: drop redundant `expected=None` (field default) - bulk: use try/except/else to eliminate dead `continue` at loop-body end - apply: fold _OTHER_SENSITIVE_HEADERS into _redact, lowering k once per iteration - input: remove what-comments from _classify_brace_start and _parse_text No behaviour changes. 608 tests pass. Co-Authored-By: Claude Sonnet 4.6 --- nsc/cli/writes/apply.py | 8 +++----- nsc/cli/writes/bulk.py | 6 +++--- nsc/cli/writes/input.py | 6 +----- nsc/cli/writes/preflight.py | 1 - 4 files changed, 7 insertions(+), 14 deletions(-) diff --git a/nsc/cli/writes/apply.py b/nsc/cli/writes/apply.py index a66cf2e..1c6fcee 100644 --- a/nsc/cli/writes/apply.py +++ b/nsc/cli/writes/apply.py @@ -155,15 +155,13 @@ def _cast_bool(value: Any) -> Any: return value -_OTHER_SENSITIVE_HEADERS = SENSITIVE_HEADERS - {"authorization"} - - def _redact(headers: dict[str, str]) -> dict[str, str]: out: dict[str, str] = {} for k, v in headers.items(): - if k.lower() == "authorization": + lowered = k.lower() + if lowered == "authorization": out[k] = _REDACTED_AUTH - elif k.lower() in _OTHER_SENSITIVE_HEADERS: + elif lowered in SENSITIVE_HEADERS: out[k] = "" else: out[k] = v diff --git a/nsc/cli/writes/bulk.py b/nsc/cli/writes/bulk.py index ca609ee..5ff966d 100644 --- a/nsc/cli/writes/bulk.py +++ b/nsc/cli/writes/bulk.py @@ -209,9 +209,9 @@ def run_loop( attempts.append(LoopAttempt(request=request, response=None, failure=failure)) if on_error == "stop": break - continue - audit_attempt(request, response, None) - attempts.append(LoopAttempt(request=request, response=response, failure=None)) + else: + audit_attempt(request, response, None) + attempts.append(LoopAttempt(request=request, response=response, failure=None)) return LoopResult(attempts=attempts) diff --git a/nsc/cli/writes/input.py b/nsc/cli/writes/input.py index fe2a4fc..a0da28a 100644 --- a/nsc/cli/writes/input.py +++ b/nsc/cli/writes/input.py @@ -94,8 +94,8 @@ def collect( used_file = True field_overlay = _parse_fields(fields) - records = _merge(file_records, field_overlay, used_file=used_file) + source: Literal["file", "stdin", "fields_only", "file_plus_fields"] if used_stdin: source = "stdin" @@ -232,9 +232,7 @@ def _classify_brace_start(stripped: str) -> str: if depth == 0: rest = stripped[i + 1 :] if rest.strip(): - # Any non-whitespace after the first object → NDJSON. return "ndjson" - # Nothing follows → single object. return "json_or_yaml" # Buffer ended mid-object; default to JSON-or-YAML. return "json_or_yaml" @@ -254,13 +252,11 @@ def _parse_text(text: str, *, hint: str | None) -> tuple[list[dict[str, Any]], b if hint == "ndjson": return _parse_ndjson(text), True if hint == ".json": - # File-extension was .json; require strict JSON, no YAML fallback. try: parsed = json.loads(text) except json.JSONDecodeError as exc: raise InputError(f"could not parse JSON input: {exc}") from exc elif hint == "json_or_yaml" or (hint is None and text.lstrip().startswith(("{", "["))): - # Stdin sniffer hint OR legacy fallback: try JSON first, then YAML. try: parsed = json.loads(text) except json.JSONDecodeError: diff --git a/nsc/cli/writes/preflight.py b/nsc/cli/writes/preflight.py index 964e4c6..e2bdc2b 100644 --- a/nsc/cli/writes/preflight.py +++ b/nsc/cli/writes/preflight.py @@ -62,7 +62,6 @@ def _check_required(index: int, record: dict[str, Any], body: RequestBodyShape) field_path=name, kind="missing_required", message=f"required field {name!r} is missing", - expected=None, ) for name in body.required if name not in record