Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies = [
"loguru>=0.7.3",
"duckdb>=1.0.0",
"pyarrow>=14.0.0",
"pyyaml>=6.0",
"packaging>=24.0",
]

[project.scripts]
Expand Down
3 changes: 3 additions & 0 deletions src/sema/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ def query(
sys.exit(1)


from sema.cli_target import target_group as _target_group

cli.add_command(_ingest_group, name="ingest")
cli.add_command(_push_cmd, name="push")
cli.add_command(_eval_group, name="eval")
cli.add_command(_target_group, name="target")
92 changes: 92 additions & 0 deletions src/sema/cli_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""CLI: `sema target load --manifest <path>`.

Connects any user-supplied ontology manifest to the target loader and
prints a `LoadedTarget` summary as JSON. Defaults to the in-memory
writer for fast inspection; `--writer neo4j` materialises into the
graph using `Neo4jGraphWriter`.
"""

from __future__ import annotations

import json
import os
import sys
from pathlib import Path

import click

from sema.cli_target_utils import build_summary
from sema.targets.adapters.manifest import (
ManifestTargetAdapter,
register_manifest_adapter,
)
from sema.targets.loader import load_target
from sema.targets.materializer import GraphWriter, InMemoryGraphWriter


@click.group(name="target")
def target_group() -> None:
"""Target ontology operations."""
register_manifest_adapter()


@target_group.command(name="load")
@click.option(
"--manifest",
"manifest_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=True,
help="Path to the YAML/JSON manifest declaring the target ontology.",
)
@click.option(
"--writer",
type=click.Choice(["in-memory", "neo4j"]),
default="in-memory",
show_default=True,
help="Where to materialise the target. `in-memory` records ops without writing.",
)
@click.option(
"--skip-facet",
"skip_facets",
multiple=True,
help="Repeatable. Operator opt-out per facet (e.g. semantic_aliases).",
)
def load_cmd(
manifest_path: Path, writer: str, skip_facets: tuple[str, ...]
) -> None:
"""Load a manifest and print a LoadedTarget summary as JSON."""
try:
adapter = ManifestTargetAdapter(manifest_path)
graph_writer = _build_writer(writer)
loaded = load_target(
adapter, writer=graph_writer, skip_facets=list(skip_facets)
)
except FileNotFoundError as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(2)
except Exception as exc:
click.echo(f"Error: {exc}", err=True)
sys.exit(1)
click.echo(json.dumps(build_summary(loaded), indent=2, default=str))


def _build_writer(kind: str) -> GraphWriter:
if kind == "in-memory":
return InMemoryGraphWriter()
if kind == "neo4j":
return _neo4j_writer_from_env()
raise click.BadParameter(f"unknown writer: {kind!r}")


def _neo4j_writer_from_env() -> GraphWriter:
neo4j = __import__("neo4j")
from sema.targets.neo4j_writer import Neo4jGraphWriter

uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "graphrag")
driver = neo4j.GraphDatabase.driver(uri, auth=(user, password))
return Neo4jGraphWriter(driver)


__all__ = ["target_group", "load_cmd"]
48 changes: 48 additions & 0 deletions src/sema/cli_target_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Helpers for `cli_target.py`: shape `LoadedTarget` for CLI output."""

from __future__ import annotations

from typing import Any

from sema.models.target.loaded import LoadedTarget


def build_summary(loaded: LoadedTarget) -> dict[str, Any]:
return {
"target_model_id": loaded.descriptor.target_model_id,
"target_model_version": loaded.descriptor.target_model_version,
"target_schema_snapshot_hash": loaded.target_schema_snapshot_hash,
"aggregate_context_card_version": loaded.aggregate_context_card_version,
"materialized_at": loaded.materialized_at.isoformat(),
"entities": [
{"qualified_name": e.qualified_name, "kind": e.kind.value}
for e in loaded.entity_refs
],
"enrichment_decisions": [
_decision_to_dict(d) for d in loaded.enrichment_decisions
],
"context_cards": [
{
"entity_ref": c.entity_ref.qualified_name,
"card_version": c.card_version,
"card_hash": c.card_hash,
}
for c in loaded.context_cards
],
}


def _decision_to_dict(record: Any) -> dict[str, Any]:
return {
"entity_ref": record.entity_ref.qualified_name,
"decisions": {
facet.value: {
"status": fd.status.value,
"reason": fd.reason,
}
for facet, fd in record.decisions.items()
},
}


__all__ = ["build_summary"]
131 changes: 131 additions & 0 deletions src/sema/graph/target_loader_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Cypher migrations for the target-model-loader storage shape.

Extends the planner contract's `planner-graph-storage` migrations with
the target-side schema artifacts produced by `TargetModelMaterializer`:
the `EnrichmentDecision` label, hash-versioned uniqueness constraints,
and indexes used by enrichment-status query workloads.
"""

from __future__ import annotations


_FACETS = (
"structure",
"obligations",
"vocabulary_bindings",
"semantic_aliases",
"terms",
)


def cypher_up() -> list[str]:
"""Forward migration: target-loader uniqueness constraints + indexes."""
statements: list[str] = []
statements.extend(_uniqueness_constraints())
statements.extend(_indexes())
return statements


def cypher_down() -> list[str]:
"""Reverse migration: drop target-loader constraints, indexes, labels."""
statements: list[str] = []
statements.extend(
f"DROP INDEX entity_enrichment_{f}_status IF EXISTS" for f in _FACETS
)
statements.extend(
[
"DROP INDEX entity_is_current IF EXISTS",
"DROP INDEX property_property_kind IF EXISTS",
"DROP INDEX property_is_current IF EXISTS",
"DROP INDEX target_obligation_is_current IF EXISTS",
"DROP INDEX target_term_is_current IF EXISTS",
"DROP INDEX target_constraint_is_current IF EXISTS",
"DROP INDEX target_vocab_binding_is_current IF EXISTS",
"DROP INDEX target_context_card_is_current IF EXISTS",
"DROP CONSTRAINT enrichment_decision_unique IF EXISTS",
"DROP CONSTRAINT target_entity_hash_unique IF EXISTS",
"DROP CONSTRAINT target_property_hash_unique IF EXISTS",
"DROP CONSTRAINT target_obligation_hash_unique IF EXISTS",
"DROP CONSTRAINT target_term_hash_unique IF EXISTS",
"DROP CONSTRAINT target_constraint_hash_unique IF EXISTS",
"DROP CONSTRAINT target_vocab_binding_hash_unique IF EXISTS",
"DROP CONSTRAINT target_context_card_hash_unique IF EXISTS",
"MATCH (n:EnrichmentDecision) DETACH DELETE n",
"MATCH (n:VocabularyBinding) DETACH DELETE n",
"MATCH (n:ContextCard) DETACH DELETE n",
]
)
return statements


def _uniqueness_constraints() -> list[str]:
return [
"CREATE CONSTRAINT enrichment_decision_unique IF NOT EXISTS "
"FOR (n:EnrichmentDecision) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.entity_ref) IS UNIQUE",
"CREATE CONSTRAINT target_entity_hash_unique IF NOT EXISTS "
"FOR (n:Entity) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.qualified_name) IS UNIQUE",
"CREATE CONSTRAINT target_property_hash_unique IF NOT EXISTS "
"FOR (n:Property) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.parent_entity_qualified_name, n.name) "
"IS UNIQUE",
"CREATE CONSTRAINT target_obligation_hash_unique IF NOT EXISTS "
"FOR (n:TargetObligation) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.target_entity) IS UNIQUE",
"CREATE CONSTRAINT target_term_hash_unique IF NOT EXISTS "
"FOR (n:Term) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.vocabulary_name, n.code) IS UNIQUE",
"CREATE CONSTRAINT target_constraint_hash_unique IF NOT EXISTS "
"FOR (n:Constraint) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.attached_property_id, "
"n.constraint_kind, n.payload_hash) IS UNIQUE",
"CREATE CONSTRAINT target_vocab_binding_hash_unique IF NOT EXISTS "
"FOR (n:VocabularyBinding) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.parent_entity_qualified_name, "
"n.property_name, n.vocabulary_name) IS UNIQUE",
"CREATE CONSTRAINT target_context_card_hash_unique IF NOT EXISTS "
"FOR (n:ContextCard) "
"REQUIRE (n.target_model_id, n.target_model_version, "
"n.target_schema_snapshot_hash, n.entity_qualified_name, "
"n.card_version) IS UNIQUE",
]


def _indexes() -> list[str]:
indexes = [
f"CREATE INDEX entity_enrichment_{f}_status IF NOT EXISTS "
f"FOR (n:Entity) ON (n.enrichment_{f}_status)"
for f in _FACETS
]
indexes.extend(
[
"CREATE INDEX entity_is_current IF NOT EXISTS "
"FOR (n:Entity) ON (n.is_current)",
"CREATE INDEX property_property_kind IF NOT EXISTS "
"FOR (n:Property) ON (n.property_kind)",
"CREATE INDEX property_is_current IF NOT EXISTS "
"FOR (n:Property) ON (n.is_current)",
"CREATE INDEX target_obligation_is_current IF NOT EXISTS "
"FOR (n:TargetObligation) ON (n.is_current)",
"CREATE INDEX target_term_is_current IF NOT EXISTS "
"FOR (n:Term) ON (n.is_current)",
"CREATE INDEX target_constraint_is_current IF NOT EXISTS "
"FOR (n:Constraint) ON (n.is_current)",
"CREATE INDEX target_vocab_binding_is_current IF NOT EXISTS "
"FOR (n:VocabularyBinding) ON (n.is_current)",
"CREATE INDEX target_context_card_is_current IF NOT EXISTS "
"FOR (n:ContextCard) ON (n.is_current)",
]
)
return indexes


__all__ = ["cypher_up", "cypher_down"]
8 changes: 8 additions & 0 deletions src/sema/models/target/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Target-side declarative DTOs.

Adapter-facing models describing what a target ontology requires.
Distinct from `sema.models.planner.target_model`, which defines the
planner-contract graph shape (ModelRole, TargetObligation, etc.).
"""

from __future__ import annotations
29 changes: 29 additions & 0 deletions src/sema/models/target/completeness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Per-facet semantic-completeness annotations.

A target ontology can be authoritative about some facets and silent about
others; per-facet annotations let the enrichment runner make per-facet
decisions instead of binary all-or-nothing.
"""

from __future__ import annotations

from enum import Enum

from pydantic import BaseModel, ConfigDict


class SemanticCompleteness(str, Enum):
COMPLETE = "COMPLETE"
PARTIAL = "PARTIAL"
NONE = "NONE"
EXTERNAL = "EXTERNAL"


class SemanticCompletenessAnnotations(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)

structure: SemanticCompleteness
obligations: SemanticCompleteness
vocabulary_bindings: SemanticCompleteness
semantic_aliases: SemanticCompleteness
terms: SemanticCompleteness
Loading
Loading