Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ backups/

# Eval run artifacts — local by default (previously-tracked runs stay tracked)
eval-runs/
.runs/
20 changes: 15 additions & 5 deletions src/sema/graph/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ def batch_upsert_properties(

def batch_upsert_terms(
self, terms: list[dict[str, Any]],
source_schema: str | None = None,
) -> None:
from sema.graph import loader_utils as _lu
_lu.batch_upsert_terms(self, terms)
_lu.batch_upsert_terms(self, terms, source_schema=source_schema)

def batch_upsert_aliases(
self, aliases: list[dict[str, Any]], parent_label: str,
Expand Down Expand Up @@ -170,7 +171,9 @@ def upsert_entity(
"ON CREATE SET e.id = $id "
"SET e.description = $description, e.source = $source, "
"e.confidence = $confidence, "
"e.resolved_at = $resolved_at "
"e.resolved_at = $resolved_at, "
"e.model_role = coalesce(e.model_role, 'SOURCE'), "
"e.source_id = coalesce(e.source_id, $source_schema, $source) "
"WITH e "
"MERGE (t:Table {name: $table_name, "
"schema_name: $schema_name, catalog: $catalog}) "
Expand All @@ -197,9 +200,13 @@ def upsert_property(
"SET p.semantic_type = $semantic_type, "
"p.source = $source, "
"p.confidence = $confidence, "
"p.resolved_at = $resolved_at "
"p.resolved_at = $resolved_at, "
"p.model_role = coalesce(p.model_role, 'SOURCE'), "
"p.source_id = coalesce(p.source_id, $source_schema, $source) "
"WITH p "
"MERGE (e:Entity {name: $entity_name}) "
"SET e.model_role = coalesce(e.model_role, 'SOURCE'), "
"e.source_id = coalesce(e.source_id, $source_schema, $source) "
"MERGE (e)-[hp:HAS_PROPERTY "
"{source_schema: $source_schema}]->(p) "
"WITH p "
Expand All @@ -219,18 +226,21 @@ def upsert_property(
def upsert_term(
self, code: str, label: str, source: str,
confidence: float,
source_schema: str | None = None,
) -> None:
id_ = str(uuid.uuid4())
self._run(
"MERGE (t:Term {code: $code}) "
"ON CREATE SET t.id = $id "
"SET t.label = $label, t.source = $source, "
"t.confidence = $confidence, "
"t.resolved_at = $resolved_at",
"t.resolved_at = $resolved_at, "
"t.model_role = coalesce(t.model_role, 'SOURCE'), "
"t.source_id = coalesce(t.source_id, $source_schema, $source)",
code=code, label=label, source=source,
confidence=confidence,
resolved_at=datetime.now(timezone.utc).isoformat(),
id=id_,
id=id_, source_schema=source_schema,
)

def upsert_value_set(
Expand Down
22 changes: 18 additions & 4 deletions src/sema/graph/loader_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def batch_upsert_entities(
"SET e.description = r.description, e.source = r.source, "
"e.confidence = r.confidence, "
"e.status = 'ACTIVE', "
"e.resolved_at = r.resolved_at "
"e.resolved_at = r.resolved_at, "
"e.model_role = coalesce(e.model_role, 'SOURCE'), "
"e.source_id = coalesce(e.source_id, r.source_schema, r.source) "
"WITH e, r "
"MERGE (t:Table {name: r.table_name, "
"schema_name: r.schema_name, catalog: r.catalog}) "
Expand All @@ -108,9 +110,13 @@ def batch_upsert_properties(
"p.source = r.source, "
"p.confidence = r.confidence, "
"p.status = 'ACTIVE', "
"p.resolved_at = r.resolved_at "
"p.resolved_at = r.resolved_at, "
"p.model_role = coalesce(p.model_role, 'SOURCE'), "
"p.source_id = coalesce(p.source_id, r.source_schema, r.source) "
"WITH p, r "
"MERGE (e:Entity {name: r.entity_name}) "
"SET e.model_role = coalesce(e.model_role, 'SOURCE'), "
"e.source_id = coalesce(e.source_id, r.source_schema, r.source) "
"MERGE (e)-[hp:HAS_PROPERTY "
"{source_schema: r.source_schema}]->(p) "
"WITH p, r "
Expand All @@ -125,12 +131,18 @@ def batch_upsert_properties(

def batch_upsert_terms(
loader: GraphLoader, terms: list[dict[str, Any]],
source_schema: str | None = None,
) -> None:
if not terms:
return
resolved_at = datetime.now(timezone.utc).isoformat()
rows = [
{**t, "resolved_at": resolved_at, "id": str(uuid.uuid4())}
{
**t,
"resolved_at": resolved_at,
"id": str(uuid.uuid4()),
"source_schema": source_schema,
}
for t in terms
]
loader._run(
Expand All @@ -141,7 +153,9 @@ def batch_upsert_terms(
"t.confidence = r.confidence, "
"t.vocabulary_name = r.vocabulary_name, "
"t.status = 'ACTIVE', "
"t.resolved_at = r.resolved_at",
"t.resolved_at = r.resolved_at, "
"t.model_role = coalesce(t.model_role, 'SOURCE'), "
"t.source_id = coalesce(t.source_id, r.source_schema, r.source)",
rows=rows,
)

Expand Down
2 changes: 1 addition & 1 deletion src/sema/graph/materializer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def upsert_decoded_values(
batch_upsert_value_sets(
loader, vs_batch, source_schema=source_schema,
)
batch_upsert_terms(loader, term_batch)
batch_upsert_terms(loader, term_batch, source_schema=source_schema)


def _collect_alias_batch(
Expand Down
Loading
Loading