Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 26 additions & 53 deletions nsc/builder/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _assimilate(
if tag_name is None:
return # untagged operations are skipped intentionally

resource_name, is_collection_path = _resource_from_path(path, tag_name)
resource_name, _ = _resource_from_path(path, tag_name)
if resource_name is None:
return # paths that don't match the /api/<tag>/<resource>/... shape

Expand All @@ -129,8 +129,7 @@ def _assimilate(
tag.resources[resource_name] = resource

op = _to_model_operation(path, http_method, schema_op, item, doc)
classification = _classify(http_method, path, schema_op, resource_name)
_attach(resource, classification, op, is_collection_path)
_attach(resource, _classify(http_method, path), op)


def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]:
Expand Down Expand Up @@ -168,11 +167,7 @@ def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]:
if _PARAM_SEGMENT.match(resource):
return None, False
remainder = parts[anchor + 2 :]
is_collection = all(not _PARAM_SEGMENT.match(p) for p in remainder) and not remainder
# If the only remaining segment is `{id}` it's the per-item path:
if len(remainder) == 1 and _PARAM_SEGMENT.match(remainder[0]):
is_collection = False
return resource, is_collection
return resource, not remainder


_CRUD_MAP: dict[tuple[HttpMethod, bool], str] = {
Expand All @@ -185,30 +180,16 @@ def _resource_from_path(path: str, tag: str) -> tuple[str | None, bool]:
}


def _classify(
method: HttpMethod,
path: str,
schema_op: SchemaOperation,
resource_name: str,
) -> str:
def _classify(method: HttpMethod, path: str) -> str:
has_id = "{id}" in path
extra_path_segments = path.rstrip("/").split("/")[-1]
# Path is a custom action endpoint if `{id}` appears AND the last segment
# is a literal name (not itself a path parameter like `{id}`).
is_action_endpoint = has_id and not _PARAM_SEGMENT.match(extra_path_segments)

if is_action_endpoint:
last_segment = path.rstrip("/").split("/")[-1]
# Custom action: `{id}` appears but the final segment is a literal name, not a parameter.
if has_id and not _PARAM_SEGMENT.match(last_segment):
return "custom"

return _CRUD_MAP.get((method, has_id), "custom")


def _attach(
resource: _MutableResource,
classification: str,
op: Operation,
is_collection_path: bool, # kept for future heuristics
) -> None:
def _attach(resource: _MutableResource, classification: str, op: Operation) -> None:
match classification:
case "list":
resource.list_op = op
Expand All @@ -233,13 +214,11 @@ def _to_model_operation(
item: PathItem,
doc: OpenAPIDocument,
) -> Operation:
if schema_op.operation_id is None:
# Synthesize one — operationId is required in practice but we degrade
# gracefully rather than crashing on hand-rolled schemas.
synthesized = f"{http_method.value.lower()}_{path.strip('/').replace('/', '_')}"
operation_id = synthesized
else:
operation_id = schema_op.operation_id
# Synthesize an operationId when absent — required in practice but we degrade
# gracefully rather than crashing on hand-rolled schemas.
operation_id = schema_op.operation_id or (
f"{http_method.value.lower()}_{path.strip('/').replace('/', '_')}"
)

seen: dict[str, Parameter] = {}
for source_param in (*item.parameters, *schema_op.parameters):
Expand Down Expand Up @@ -289,12 +268,13 @@ def _to_request_body_shape(
top_level = _classify_top_level(schema, doc)
if top_level is None:
return None
if top_level in ("object", "object_or_array"):
is_object_like = top_level in ("object", "object_or_array")
if is_object_like:
branch = _object_branch(schema, doc)
required = list(branch.required or []) if branch is not None else []
else:
required = []
fields = _flat_fields(schema, doc) if top_level in ("object", "object_or_array") else {}
fields = _flat_fields(schema, doc) if is_object_like else {}
raw_paths = _collect_sensitive_paths(schema, doc)
sensitive_paths = tuple(sorted({".".join(p) for p in raw_paths if p}))
return RequestBodyShape(
Expand Down Expand Up @@ -326,9 +306,8 @@ def _classify_top_level(
) -> Literal["object", "array", "object_or_array"] | None:
if schema.type in ("object", "array"):
return schema.type # type: ignore[return-value]
one_of = schema.model_extra.get("oneOf") if schema.model_extra else None
any_of = schema.model_extra.get("anyOf") if schema.model_extra else None
candidates = one_of or any_of or None
extras = schema.model_extra or {}
candidates = extras.get("oneOf") or extras.get("anyOf") or None
if not candidates:
return None
types = {t for raw in candidates if (t := _branch_type(raw, doc)) is not None}
Expand All @@ -343,9 +322,8 @@ def _object_branch(schema: SchemaObject, doc: OpenAPIDocument) -> SchemaObject |
"""Find the object-shaped variant of a oneOf/anyOf, resolving $ref branches."""
if schema.type == "object":
return schema
candidates = (
(schema.model_extra or {}).get("oneOf") or (schema.model_extra or {}).get("anyOf") or []
)
extras = schema.model_extra or {}
candidates = extras.get("oneOf") or extras.get("anyOf") or []
for raw in candidates:
if not isinstance(raw, dict):
continue
Expand Down Expand Up @@ -400,7 +378,7 @@ def _resolve_ref(schema: SchemaObject, doc: OpenAPIDocument) -> SchemaObject | N
def _is_sensitive_field(name: str, schema: SchemaObject) -> bool:
if name.lower() in _CANONICAL_SENSITIVE_NAMES:
return True
return getattr(schema, "format", None) == "password"
return schema.format == "password"


def _collect_sensitive_paths(
Expand All @@ -416,15 +394,12 @@ def _collect_sensitive_paths(
self-referential schemas (NetBox has a few via `tags`).
"""
target = _resolve_ref(schema, doc) or schema
new_seen = seen
if target.ref is not None:
if target.ref in seen:
return []
new_seen = seen | {target.ref}
elif schema.ref is not None:
if schema.ref is not None:
if schema.ref in seen:
return []
new_seen = seen | {schema.ref}
else:
new_seen = seen

paths: list[tuple[str, ...]] = []

Expand All @@ -442,10 +417,8 @@ def _collect_sensitive_paths(
if target.type == "array" and target.items is not None:
paths.extend(_collect_sensitive_paths(target.items, doc, prefix=prefix, seen=new_seen))

candidates = (
(target.model_extra or {}).get("oneOf") or (target.model_extra or {}).get("anyOf") or []
)
for raw in candidates:
target_extras = target.model_extra or {}
for raw in target_extras.get("oneOf") or target_extras.get("anyOf") or []:
if isinstance(raw, dict):
branch = SchemaObject.model_validate(raw)
paths.extend(_collect_sensitive_paths(branch, doc, prefix=prefix, seen=new_seen))
Expand Down
Loading