From 59c6e629a59d9da1838597d92cb91debdb3995ad Mon Sep 17 00:00:00 2001 From: vmihalovski Date: Wed, 3 Jun 2026 18:20:51 +0200 Subject: [PATCH] Adding ontology Python project to the converters folder --- converters/ontology/.gitignore | 42 + converters/ontology/README.md | 93 ++ converters/ontology/pyproject.toml | 29 + converters/ontology/requirements.lock | 21 + .../ontology/scripts/palantir_to_osi.py | 55 + converters/ontology/src/osi/__init__.py | 72 ++ .../ontology/src/osi/common/__init__.py | 0 .../ontology/src/osi/common/file_utils.py | 42 + converters/ontology/src/osi/common/graph.py | 180 +++ converters/ontology/src/osi/common/utils.py | 43 + .../ontology/src/osi/converter/__init__.py | 0 .../src/osi/converter/osi_to_spec/__init__.py | 0 .../osi/converter/osi_to_spec/converter.py | 271 ++++ .../osi/converter/palantir_to_osi/__init__.py | 0 .../converter/palantir_to_osi/converter.py | 778 ++++++++++++ .../src/osi/converter/spec_to_osi/__init__.py | 0 .../osi/converter/spec_to_osi/converter.py | 499 ++++++++ .../ontology/src/osi/external/__init__.py | 0 .../src/osi/external/palantir/__init__.py | 0 .../src/osi/external/palantir/model.py | 632 +++++++++ .../osi/external/palantir/parser/__init__.py | 680 ++++++++++ converters/ontology/src/osi/model.py | 1128 +++++++++++++++++ .../ontology/src/osi/parser/__init__.py | 49 + converters/ontology/src/osi/spec.py | 248 ++++ 24 files changed, 4862 insertions(+) create mode 100644 converters/ontology/.gitignore create mode 100644 converters/ontology/README.md create mode 100644 converters/ontology/pyproject.toml create mode 100644 converters/ontology/requirements.lock create mode 100644 converters/ontology/scripts/palantir_to_osi.py create mode 100644 converters/ontology/src/osi/__init__.py create mode 100644 converters/ontology/src/osi/common/__init__.py create mode 100644 converters/ontology/src/osi/common/file_utils.py create mode 100644 converters/ontology/src/osi/common/graph.py create mode 100644 converters/ontology/src/osi/common/utils.py create mode 100644 converters/ontology/src/osi/converter/__init__.py create mode 100644 converters/ontology/src/osi/converter/osi_to_spec/__init__.py create mode 100644 converters/ontology/src/osi/converter/osi_to_spec/converter.py create mode 100644 converters/ontology/src/osi/converter/palantir_to_osi/__init__.py create mode 100644 converters/ontology/src/osi/converter/palantir_to_osi/converter.py create mode 100644 converters/ontology/src/osi/converter/spec_to_osi/__init__.py create mode 100644 converters/ontology/src/osi/converter/spec_to_osi/converter.py create mode 100644 converters/ontology/src/osi/external/__init__.py create mode 100644 converters/ontology/src/osi/external/palantir/__init__.py create mode 100644 converters/ontology/src/osi/external/palantir/model.py create mode 100644 converters/ontology/src/osi/external/palantir/parser/__init__.py create mode 100644 converters/ontology/src/osi/model.py create mode 100644 converters/ontology/src/osi/parser/__init__.py create mode 100644 converters/ontology/src/osi/spec.py diff --git a/converters/ontology/.gitignore b/converters/ontology/.gitignore new file mode 100644 index 0000000..211fd5c --- /dev/null +++ b/converters/ontology/.gitignore @@ -0,0 +1,42 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd + +# Virtual environments +venv/ +.venv/ +env/ + +# pyenv +.python-version + +# Build / packaging +dist/ +build/ +*.egg-info/ +*.egg +.eggs/ + +# Pytest +.pytest_cache/ +.coverage +htmlcov/ + +# Mypy +.mypy_cache/ + +# Ruff +.ruff_cache/ + +# VS Code +.vscode/ +*.code-workspace +.history/ + +# JetBrains (PyCharm, IntelliJ, etc.) +.idea/ +*.iml +*.iws +*.ipr \ No newline at end of file diff --git a/converters/ontology/README.md b/converters/ontology/README.md new file mode 100644 index 0000000..0e09873 --- /dev/null +++ b/converters/ontology/README.md @@ -0,0 +1,93 @@ +# OSI Ontology Converters + +Converters between OSI, Palantir, and Spec ontology formats. + +| Converter | Direction | +|-----------|-----------| +| `palantir_to_osi` | Palantir ontology → OSI model | +| `osi_to_spec` | OSI model → Spec YAML | +| `spec_to_osi` | Spec YAML → OSI model | + +## Prerequisites + +- [pyenv](https://github.com/pyenv/pyenv) — manages the Python version + +Install pyenv if you don't have it: + +```bash +brew install pyenv +``` + +Add to your shell profile (`~/.zshrc` or `~/.bashrc`) and restart the shell: + +```bash +export PYENV_ROOT="$HOME/.pyenv" +export PATH="$PYENV_ROOT/bin:$PATH" +eval "$(pyenv init -)" +``` + +## Setup + +```bash +pyenv install 3.11 +pyenv local 3.11 +pip install --upgrade pip +pip install virtualenv +python -m virtualenv venv +source ./venv/bin/activate +pip install -r requirements.lock +pip install -e ".[dev]" +``` + +## Generating / updating the lock file + +`requirements.lock` is produced by [pip-tools](https://github.com/jazzband/pip-tools) from `pyproject.toml`. +Run this whenever you add or change a dependency: + +```bash +pip-compile --output-file requirements.lock pyproject.toml +``` + +## Usage + +The package is importable as `osi` after installation: + +```python +from osi.converter.palantir_to_osi.converter import PalantirToOsiConverter +from osi.converter.osi_to_spec.converter import OsiToSpecConverter +from osi.converter.spec_to_osi.converter import SpecToOsiConverter +``` + +## Scripts + +### `scripts/palantir_to_osi.py` + +Converts a Palantir ontology export (`.zip` file containing a Palantir ontology JSON and one or more dataset spec JSON files) into an OSI-compliant YAML representation, printed to stdout. + +**Usage:** + +```bash +python scripts/palantir_to_osi.py path/to/palantir_export.zip +``` + +Warnings are written to stderr; the OSI YAML is written to stdout. + +**Environment variables (optional):** + +| Variable | Default | Description | +|---------------------------|------------|----------------------------------------------------------| +| `SNOWFLAKE_DATABASE_NAME` | `PALANTIR` | Snowflake database name used to qualify table references | +| `SNOWFLAKE_SCHEMA_NAME` | `PALANTIR` | Snowflake schema name used to qualify table references | + +If already set in your environment they will be picked up automatically. To override them for a single run: + +```bash +SNOWFLAKE_DATABASE_NAME=MY_DB SNOWFLAKE_SCHEMA_NAME=MY_SCHEMA \ + python scripts/palantir_to_osi.py path/to/palantir_export.zip +``` + +## Deactivating the environment + +```bash +deactivate +``` \ No newline at end of file diff --git a/converters/ontology/pyproject.toml b/converters/ontology/pyproject.toml new file mode 100644 index 0000000..6a88147 --- /dev/null +++ b/converters/ontology/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "ontology" +version = "0.1.0" +description = "OSI ontology converters — Palantir → OSI, OSI → Spec, Spec → OSI" +readme = "README.md" +authors = [ + { name = "RelationalAI", email = "support@relational.ai" }, +] +requires-python = ">= 3.11" +dependencies = [ + "pydantic", + "pyyaml", +] + +[project.optional-dependencies] +dev = [ + "pytest==9.0.3", + "pytest-snapshot", + "parameterized", + "pip-tools", +] + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] + +pythonVersion = "3.11" \ No newline at end of file diff --git a/converters/ontology/requirements.lock b/converters/ontology/requirements.lock new file mode 100644 index 0000000..39f9e31 --- /dev/null +++ b/converters/ontology/requirements.lock @@ -0,0 +1,21 @@ +# +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: +# +# pip-compile --output-file=requirements.lock pyproject.toml +# +annotated-types==0.7.0 + # via pydantic +pydantic==2.13.4 + # via osi-ontology-converters (pyproject.toml) +pydantic-core==2.46.4 + # via pydantic +pyyaml==6.0.3 + # via osi-ontology-converters (pyproject.toml) +typing-extensions==4.15.0 + # via + # pydantic + # pydantic-core + # typing-inspection +typing-inspection==0.4.2 + # via pydantic diff --git a/converters/ontology/scripts/palantir_to_osi.py b/converters/ontology/scripts/palantir_to_osi.py new file mode 100644 index 0000000..ecdab07 --- /dev/null +++ b/converters/ontology/scripts/palantir_to_osi.py @@ -0,0 +1,55 @@ +# Description: +# +# This script converts a zip file that contains: +# 1. A Palantir ontology (JSON file) and +# 2. A folder containing one or more Palantir dataset specs (JSON files) +# into an OSI compliant YAML representation of that ontology, using environment +# variables to configure the Snowflake database and schema names. +# +# Usage: +# +# $ python palantir_to_osi.py +# +# Environment variables used: +# +# - SNOWFLAKE_DATABASE_NAME +# - SNOWFLAKE_SCHEMA_NAME +# +# The tables that populate the ontology are named +# "{SNOWFLAKE_DATABASE_NAME}.{SNOWFLAKE_SCHEMA_NAME}.{TABLE_NAME}" +# where TABLE_NAME is the name of a data set that is referenced in +# the Palantir ontology. +# +# Outputs: +# +# - stderr: Warnings +# +import os +import sys +from pathlib import Path + +from osi.converter.palantir_to_osi.converter import PalantirToOsiConverter +from osi.converter.osi_to_spec.converter import OsiToSpecConverter + +from osi.external.palantir.parser import PalantirParser + +if __name__ == "__main__": + db_name = os.environ.get("SNOWFLAKE_DATABASE_NAME", "PALANTIR") + schema_name = os.environ.get("SNOWFLAKE_SCHEMA_NAME", "PALANTIR") + + if len(sys.argv) != 2: + raise Exception(f"++ Usage: {sys.argv[0]} path to Palantir sources") + + path = Path(sys.argv[1]) + + parser = PalantirParser() + + mode = "rb" if path.suffix.lower() == ".zip" else "r" + with open(path, mode) as file: + parser.parse(file) + + ontology_model = PalantirToOsiConverter.convert(parser.model(), db_name, schema_name) + + osi_spec = OsiToSpecConverter.convert(ontology_model) + print(osi_spec.dump_yaml()) + diff --git a/converters/ontology/src/osi/__init__.py b/converters/ontology/src/osi/__init__.py new file mode 100644 index 0000000..5aa05c2 --- /dev/null +++ b/converters/ontology/src/osi/__init__.py @@ -0,0 +1,72 @@ +""" +Public API surface for osi. + +Consumers should import from here rather than from deep sub-paths. +""" + +from osi.model import ( + Concept, + ConceptMapping, + ConceptType, + CustomExtension, + Dataset, + DatasetField, + DialectExpression, + DialectExpressionSet, + Formula, + JoinPath, + LinkMapping, + Metric, + ObjectMapping, + OntologyComponent, + OntologyMapping, + OsiOntology, + ReferentMapping, + Relationship, + RelationshipMultiplicity, + Role, + SemanticModel, +) +from osi.spec import OsiSpec +from osi.parser import OsiParser +from osi.external.palantir.parser import PalantirParser +from osi.converter.spec_to_osi.converter import SpecToOsiConverter +from osi.converter.osi_to_spec.converter import OsiToSpecConverter +from osi.converter.palantir_to_osi.converter import PalantirToOsiConverter + +__all__ = [ + # Model — ontology layer + "Concept", + "ConceptType", + "Relationship", + "RelationshipMultiplicity", + "Role", + "Formula", + # Model — semantic layer + "Dataset", + "DatasetField", + "DialectExpression", + "DialectExpressionSet", + "JoinPath", + "Metric", + "SemanticModel", + # Model — mapping layer + "ObjectMapping", + "ReferentMapping", + "LinkMapping", + "ConceptMapping", + "OntologyMapping", + "OntologyComponent", + "OsiOntology", + # Supporting types + "CustomExtension", + # Spec DTO + "OsiSpec", + # Parsers + "OsiParser", + "PalantirParser", + # Converters + "SpecToOsiConverter", + "OsiToSpecConverter", + "PalantirToOsiConverter", +] \ No newline at end of file diff --git a/converters/ontology/src/osi/common/__init__.py b/converters/ontology/src/osi/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/common/file_utils.py b/converters/ontology/src/osi/common/file_utils.py new file mode 100644 index 0000000..9959052 --- /dev/null +++ b/converters/ontology/src/osi/common/file_utils.py @@ -0,0 +1,42 @@ +import io +import zipfile +from typing import Iterable + + +def iter_json_files_from_dir_in_zip(zf: zipfile.ZipFile, dir_prefix: str) -> Iterable[tuple[str, io.IOBase]]: + names = zf.namelist() + base_prefix = dir_prefix.rstrip("/") + "/" + roots = {n.split("/", 1)[0] for n in names if "/" in n} + candidate_prefixes = [base_prefix] + if len(roots) == 1: + root = next(iter(roots)) + candidate_prefixes.append(f"{root}/{base_prefix}") + + seen = set() + for name in names: + if name.endswith("/") or not name.lower().endswith(".json"): + continue + if any(name.startswith(p) for p in candidate_prefixes): + if name in seen: + continue + seen.add(name) + with zf.open(name, "r") as fp: + yield name, io.BytesIO(fp.read()) + +def open_top_level_file_from_zip(zf: zipfile.ZipFile, filename: str) -> io.IOBase: + names = set(zf.namelist()) + + def _open_to_io_base(n: str) -> io.IOBase: + with zf.open(n, "r") as fp: + return io.BytesIO(fp.read()) + + if filename in names: + return _open_to_io_base(filename) + + roots = {n.split("/", 1)[0] for n in names if "/" in n} + if len(roots) == 1: + candidate = f"{next(iter(roots))}/{filename}" + if candidate in names: + return _open_to_io_base(candidate) + + raise FileNotFoundError(f"Missing required top-level file: {filename}") \ No newline at end of file diff --git a/converters/ontology/src/osi/common/graph.py b/converters/ontology/src/osi/common/graph.py new file mode 100644 index 0000000..aa09f61 --- /dev/null +++ b/converters/ontology/src/osi/common/graph.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import warnings +from collections import defaultdict +from typing import TypeVar + +T = TypeVar("T") + + +def topological_sort(nodes: list[T], edges: list[tuple[T, T]]) -> list[T]: + order = _topological_sort(nodes, edges) + if order is None: + raise ValueError("The graph contains a cycle") + return order + + +def topological_sort_break_cycles(nodes: list[T], edges: list[tuple[T, T]]) -> tuple[list[T], list[tuple[T, T]]]: + order, removed_edges = _topological_sort_break_cycles(nodes, edges) + # `order` should always exist; defensive check: + if order is None: + raise ValueError("Could not break cycles to obtain a topological order") + + return order, removed_edges + + +def is_acyclic_graph(nodes: list[T], edges: list[tuple[T, T]]) -> bool: + return _topological_sort(nodes, edges) is not None + + +def _find_cycle_closing_edge_index( + nodes: list[T], + edge_list: defaultdict[T, list[tuple[T, int]]], + active: list[bool], + remaining_set: set[T], +) -> int | None: + """ + Find a cycle in the active subgraph induced by remaining_set and return the + index of a "cycle-closing" edge (a back-edge u->v where v is on the recursion stack). + """ + visited: set[T] = set() + on_stack: set[T] = set() + + def dfs(u: T) -> int | None: + visited.add(u) + on_stack.add(u) + + for v, eidx in edge_list.get(u, []): + if not active[eidx]: + continue + if v not in remaining_set: + continue + + if v not in visited: + found = dfs(v) + if found is not None: + return found + elif v in on_stack: + # Back-edge found: u -> v closes a directed cycle + return eidx + + on_stack.remove(u) + return None + + for start in nodes: + if start in remaining_set and start not in visited: + found = dfs(start) + if found is not None: + return found + + return None + + +def _topological_sort_break_cycles(nodes: list[T], edges: list[tuple[T, T]]) -> tuple[list[T] | None, list[tuple[T, T]]]: + """ + Returns (topological_order, removed_edges). + + Strategy: + - Run a Kahn-like process. + - When it gets stuck, detect a real cycle in the remaining subgraph via DFS + and remove the cycle-closing edge (back-edge) from that cycle. + - Continue until all nodes can be processed. + - Then run a clean topological sort once on the pruned edge list. + """ + node_set = set(nodes) + + edge_list: defaultdict[T, list[tuple[T, int]]] = defaultdict(list) + active = [True] * len(edges) + + in_degree: dict[T, int] = {n: 0 for n in nodes} + for idx, (src, tgt) in enumerate(edges): + if src not in node_set or tgt not in node_set: + active[idx] = False + continue + edge_list[src].append((tgt, idx)) + in_degree[tgt] += 1 + + processed: set[T] = set() + removed_edges: list[tuple[T, T]] = [] + + work: list[T] = [n for n in nodes if in_degree.get(n, 0) == 0] + + while len(processed) < len(nodes): + if work: + n = work.pop() + if n in processed: + continue + processed.add(n) + + for neighbour, eidx in edge_list.get(n, []): + if not active[eidx]: + continue + in_degree[neighbour] -= 1 + if in_degree[neighbour] == 0: + work.append(neighbour) + continue + + remaining_set = {n for n in nodes if n not in processed} + + edge_idx = _find_cycle_closing_edge_index( + nodes=nodes, + edge_list=edge_list, + active=active, + remaining_set=remaining_set, + ) + if edge_idx is None: + raise ValueError("Cycle suspected but could not identify a cycle edge to remove") + + src, tgt = edges[edge_idx] + active[edge_idx] = False + removed_edges.append((src, tgt)) + warnings.warn(f"Cycle detected: removing cycle-closing edge {src!r} -> {tgt!r}") + + # Update in_degree to reflect edge removal + in_degree[tgt] -= 1 + if in_degree[tgt] == 0: + work.append(tgt) + + cleaned_edges = [e for i, e in enumerate(edges) if active[i]] + order = _topological_sort(nodes, cleaned_edges) + if order is None: + raise ValueError("Graph is still cyclic after cycle-breaking edge removals") + + return order, removed_edges + + +def _topological_sort(nodes: list[T], edges: list[tuple[T, T]]) -> list[T] | None: + order = [] + + # simple implementation of Kahn's Algorithm + + # index edges + edge_list = defaultdict(list) + for src, tgt in edges: + edge_list[src].append(tgt) + + # compute in_degree of nodes + in_degree = dict() + for _, tgt in edges: + if tgt in in_degree: + in_degree[tgt] = in_degree[tgt] + 1 + else: + in_degree[tgt] = 1 + + # start the working list with nodes that don't have incoming edges + work = list(filter(lambda n: n not in in_degree, nodes)) + while work: + n = work.pop() + order.append(n) + for neighbour in edge_list[n]: + new_in_degree = in_degree[neighbour] - 1 + in_degree[neighbour] = new_in_degree + if new_in_degree == 0: + work.append(neighbour) + + # all nodes sorted, return the order + if len(order) == len(nodes): + return order + + # some nodes were not sorted, so the graph is cyclic, return None + return None diff --git a/converters/ontology/src/osi/common/utils.py b/converters/ontology/src/osi/common/utils.py new file mode 100644 index 0000000..e2fa8bd --- /dev/null +++ b/converters/ontology/src/osi/common/utils.py @@ -0,0 +1,43 @@ +import logging +import re +from keyword import iskeyword + + +def camel_to_snake(name: str) -> str: + return re.sub(r'(? str: + words = re.split(r'[\s_\-\(\)<>:]+', text) + return ''.join(capitalize_first(word) for word in words)\ + .replace('[', '')\ + .replace(']', '_')\ + .replace('&', 'And') + +def capitalize_first(s): + return s[0].upper() + s[1:] if s else s + +digit_names = {'0': 'Zero', '1': 'One', '2': 'Two', '3': 'Three', '4': 'Four', + '5': 'Five', '6': 'Six', '7': 'Seven', '8': 'Eight', '9': 'Nine'} + +def to_verbalization_string(verb_string: str) -> str: + canonical_name = verb_string.lower().strip() + # replace ' ' and '-' with '_' + canonical_name = re.sub(r'[-\s]', '_', canonical_name) + # drop subsequent '_' + canonical_name = re.sub(r'_+', '_', canonical_name) + # replace unsupported symbols with '_' + new_name = re.sub(r'[^a-zA-Z0-9_-]', '_', canonical_name) + + if not new_name: + raise ValueError(f"Verbalization string {verb_string!r} reduces to an empty identifier after normalisation") + + # replace leading digits with alpha + if new_name[0].isdigit(): + new_name = digit_names[new_name[0]] + new_name[1:] + + if new_name != canonical_name: + logging.warning(f"Verbalization string {verb_string} has unsupported symbols. Replacing them with '_'") + if iskeyword(new_name): + new_name = f"{new_name}_k" + logging.warning(f"Verbalization string {verb_string} is a reserved keyword. Appending '_k' suffix.") + return new_name \ No newline at end of file diff --git a/converters/ontology/src/osi/converter/__init__.py b/converters/ontology/src/osi/converter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/converter/osi_to_spec/__init__.py b/converters/ontology/src/osi/converter/osi_to_spec/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/converter/osi_to_spec/converter.py b/converters/ontology/src/osi/converter/osi_to_spec/converter.py new file mode 100644 index 0000000..6bc6656 --- /dev/null +++ b/converters/ontology/src/osi/converter/osi_to_spec/converter.py @@ -0,0 +1,271 @@ +"""Reverse converter: OsiOntology (runtime) -> OsiSpec (Pydantic DTO). + +Pairs with spec_to_osi.SpecToOsiConverter so a full round-trip +yaml -> spec -> model -> spec -> yaml is structurally stable.""" + +from __future__ import annotations + +from osi.model import ( + Concept, + ConceptMapping, + ConceptType, + CustomExtension, + Dataset, + DatasetField, + DialectExpressionSet, + Dimension, + JoinPath, + LinkMapping, + SemanticModel, + Metric, + ObjectMapping, + OntologyComponent, + OntologyMapping, + ReferentMapping, + Relationship, + OsiOntology, +) +from osi.spec import ( + Concept as SpecConcept, + ConceptComponent, + ConceptMapping as SpecConceptMapping, + CustomExtension as SpecCustomExtension, + Dataset as SpecDataset, + DatasetField as SpecDatasetField, + DialectExpression as SpecDialectExpression, + Dimension as SpecDimension, + Expression as SpecExpression, + JoinPath as SpecJoinPath, + LinkMapping as SpecLinkMapping, + SemanticModel as SpecSemanticModel, + Metric as SpecMetric, + ObjectMapping as SpecObjectMapping, + OntologyMapping as SpecOntologyMapping, + OsiSpec, + ReferentMapping as SpecReferentMapping, + Relationship as SpecRelationship, + Role as SpecRole, +) + + +class OsiToSpecConverter: + """Top-level reverse converter.""" + + @staticmethod + def convert(model: OsiOntology) -> OsiSpec: + ont = model.ontology + ontology_mappings = [_convert_ontology_mapping(ontology_mapping) for ontology_mapping in model.ontology_mappings] + return OsiSpec( + version=model.version, + name=model.name, + description=model.description, + ai_context=model.ai_context, + ontology=_convert_ontology_concepts(ont), + ontology_mappings=ontology_mappings, + ) + + +# --------------------------------------------------------------------------- +# Ontology +# --------------------------------------------------------------------------- + +def _convert_ontology_concepts(ont: OntologyComponent) -> list[ConceptComponent]: + components: list[ConceptComponent] = [] + for concept in ont.concepts(): + rels = [rel for rel in ont.relationships if rel.container is concept] + if rels: + components.append( + ConceptComponent( + concept=_convert_concept(concept), + relationships=[_convert_relationship(rel) for rel in rels], + ) + ) + return components + + +def _convert_concept(concept: Concept) -> SpecConcept: + type_value: str | None = None + if isinstance(concept.type, ConceptType): + type_value = concept.type.value # type: ignore[union-attr] + extends = [p.name for p in concept.extends] if concept.extends else None + + identify_by: list[str] = [rel.name for rel in concept.identify_by.values()] + derived_by = [f.raw_expr for f in concept.derived_by] + requires = [f.raw_expr for f in concept.requires] + + return SpecConcept( + name=concept.name, + type=type_value, # type: ignore[arg-type] + description=concept.description, + extends=extends, + identify_by=identify_by, + derived_by=derived_by, + requires=requires, + ) + + +def _convert_relationship(rel: Relationship) -> SpecRelationship: + extra_roles = list(rel.roles)[1:] + roles = [SpecRole(concept=role.player.name, name=role.explicit_name) for role in extra_roles] + + multiplicity = rel.multiplicity.value if rel.multiplicity is not None else None + verbalizes = rel.verbalizes_raw if rel.verbalizes_raw is not None else [] + + return SpecRelationship( + name=rel.name, + description=rel.description, + roles=roles, + verbalizes=verbalizes, + multiplicity=multiplicity, # type: ignore[arg-type] + derived_by=[f.raw_expr for f in rel.derived_by], + requires=[f.raw_expr for f in rel.requires], + ) + + +# --------------------------------------------------------------------------- +# Semantic model +# --------------------------------------------------------------------------- + +def _convert_semantic_model(semantic_model: SemanticModel) -> SpecSemanticModel: + return SpecSemanticModel( + name=semantic_model.name, + description=semantic_model.description, + ai_context=semantic_model.ai_context, + datasets=[_convert_dataset(ds) for ds in semantic_model.datasets], + relationships=[_convert_join_path(jp) for jp in semantic_model.join_paths], + metrics=[_convert_metric(metric) for metric in semantic_model.metrics], + custom_extensions=[_convert_custom_extension(ce) for ce in semantic_model.custom_extensions], + ) + + +def _convert_dataset(ds: Dataset) -> SpecDataset: + return SpecDataset( + name=ds.name, + source=ds.source, + primary_key=ds.primary_key, + unique_keys=ds.unique_keys, + description=ds.description, + ai_context=ds.ai_context, + fields=[_convert_dataset_field(fl) for fl in ds.fields], + custom_extensions=[_convert_custom_extension(ce) for ce in ds.custom_extensions], + ) + + +def _convert_dataset_field(fl: DatasetField) -> SpecDatasetField: + return SpecDatasetField( + name=fl.name, + expression=_convert_expression(fl.expression), + dimension=_convert_dimension(fl.dimension), + label=fl.label, + description=fl.description, + ai_context=fl.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in fl.custom_extensions], + ) + + +def _convert_expression(es: DialectExpressionSet) -> SpecExpression: + return SpecExpression( + dialects=[SpecDialectExpression(dialect=d.dialect, expression=d.expression) for d in es.dialects] + ) + + +def _convert_dimension(dim: Dimension | None) -> SpecDimension | None: + if dim is None: + return None + return SpecDimension(is_time=dim.is_time) + + +def _convert_join_path(jp: JoinPath) -> SpecJoinPath: + return SpecJoinPath( + name=jp.name, + **{"from": jp.from_dataset.name}, # `from` is a reserved word in Python + to=jp.to_dataset.name, + from_columns=[from_col.name for from_col in jp.from_columns], + to_columns=[to_col.name for to_col in jp.to_columns], + ai_context=jp.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in jp.custom_extensions], + ) + + +def _convert_metric(metric: Metric) -> SpecMetric: + return SpecMetric( + name=metric.name, + expression=_convert_expression(metric.expression), + description=metric.description, + ai_context=metric.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in metric.custom_extensions], + ) + + +def _convert_custom_extension(ce: CustomExtension) -> SpecCustomExtension: + return SpecCustomExtension(vendor_name=ce.vendor_name, data=ce.data) + + +# --------------------------------------------------------------------------- +# Ontology mapping (tree) +# --------------------------------------------------------------------------- + +def _convert_ontology_mapping(ontology_mapping: OntologyMapping) -> SpecOntologyMapping: + return SpecOntologyMapping( + name=ontology_mapping.name, + description=ontology_mapping.description, + semantic_model=_convert_semantic_model(ontology_mapping.semantic_model), + concept_mappings=[_convert_concept_mapping(concept_mapping) for concept_mapping in ontology_mapping.concept_mappings], + ) + + +def _convert_concept_mapping(concept_mapping: ConceptMapping) -> SpecConceptMapping: + return SpecConceptMapping( + concept=concept_mapping.concept.name, + object_mappings=[_convert_object_mapping(object_mapping) for object_mapping in concept_mapping.object_mappings], + link_mappings=[_convert_link_mapping(link_mapping) for link_mapping in concept_mapping.link_mappings], + ) + + +def _convert_object_mapping(object_mapping: ObjectMapping) -> SpecObjectMapping: + referent_mappings = None + if object_mapping.referent_mappings is not None: + referent_mappings = [_convert_referent_mapping(rm) for rm in object_mapping.referent_mappings] + return SpecObjectMapping( + concept=object_mapping.concept.name if object_mapping.concept is not None else None, + expression=_render_mapping_expression(object_mapping.expression), + referent_mappings=referent_mappings, + ) + + +def _convert_referent_mapping(referent_mapping: ReferentMapping) -> SpecReferentMapping: + nested = None + if referent_mapping.referent_mappings is not None: + nested = [_convert_referent_mapping(child) for child in referent_mapping.referent_mappings] + return SpecReferentMapping( + relationship=referent_mapping.relationship.name, + expression=_render_mapping_expression(referent_mapping.expression), + referent_mappings=nested, + ) + + +def _render_mapping_expression(expr) -> str | None: + """Reconstruct the source string for a parsed mapping expression. The + runtime model carries either a `DatasetField` (single field reference) + or a `Formula` (richer expression); both round-trip back to the same + string the forward converter saw in the spec.""" + if expr is None: + return None + from osi.model import DatasetField as _DF, Formula as _F + if isinstance(expr, _DF): + ds = expr.dataset + return f"{ds.name}.{expr.name}" if ds is not None else expr.name + if isinstance(expr, _F): + return expr.raw_expr + return str(expr) + + +def _convert_link_mapping(link_mapping: LinkMapping) -> SpecLinkMapping: + children = None + if link_mapping.children is not None: + children = [_convert_link_mapping(child) for child in link_mapping.children] + return SpecLinkMapping( + object_mapping=_convert_object_mapping(link_mapping.object_mapping), + relationship=link_mapping.relationship.name if link_mapping.relationship is not None else None, + children=children, + ) \ No newline at end of file diff --git a/converters/ontology/src/osi/converter/palantir_to_osi/__init__.py b/converters/ontology/src/osi/converter/palantir_to_osi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/converter/palantir_to_osi/converter.py b/converters/ontology/src/osi/converter/palantir_to_osi/converter.py new file mode 100644 index 0000000..a357882 --- /dev/null +++ b/converters/ontology/src/osi/converter/palantir_to_osi/converter.py @@ -0,0 +1,778 @@ +"""Palantir `Ontology` -> `OsiOntology`.""" + +from __future__ import annotations + +import warnings + +from osi.common.graph import topological_sort_break_cycles +from osi.common.utils import to_pascal_case, to_verbalization_string +from osi.external.palantir.model import ( + ArrayDataType, + DataSet as PalantirDataSet, + DataSetColumn, + DataType, + IntermediaryRelation, + ManyToManyRelation, + ManyToOneRelation, + ObjectType, + Ontology as PalantirOntology, + Property as PalantirProperty, + Relation, +) +from osi.model import ( + Concept, + ConceptMapping, + ConceptType, + Dataset, + DatasetField, + DialectExpression, + DialectExpressionSet, + Formula, + LinkMapping, + SemanticModel, + ObjectMapping, + OntologyComponent, + OntologyMapping, + ReferentMapping, + Relationship, + RelationshipMultiplicity, + OsiOntology +) + + +_DEFAULT_DIALECT = "ANSI_SQL" + + +class PalantirToOsiConverter: + """Top-level converter. Use `convert(ontology_model)` to obtain an `OsiOntology`.""" + + depths_role_names = {1: "fst", 2: "snd", 3: "thd", 4: "frt"} + + # ------------------------------------------------------------------ + # Entry point + # ------------------------------------------------------------------ + + @staticmethod + def convert( + palantir_ontology: PalantirOntology, + db_name: str = "palantir", + schema_name: str = "palantir", + ) -> OsiOntology: + ontology = OntologyComponent() + model = OsiOntology(name="Palantir model", ontology=ontology, version="0.1.0") + + semantic_model = SemanticModel(name="Palantir semantic model") + + ontology_mapping = OntologyMapping(name="palantir_map", ontology=ontology, semantic_model=semantic_model) + model.add_ontology_mapping(ontology_mapping) + + # Per-(concept, dataset) ConceptMappings accumulate here as datasets + # get created; emitted into the OntologyMapping at the end so they appear in a stable order. + concept_mappings: list[ConceptMapping] = [] + + PalantirToOsiConverter._convert_concepts( + ontology, semantic_model, palantir_ontology, concept_mappings, db_name, schema_name + ) + PalantirToOsiConverter._convert_relationships( + ontology, palantir_ontology, concept_mappings, semantic_model + ) + + for cm in concept_mappings: + ontology_mapping.add_concept_mapping(cm) + + return model + + # ------------------------------------------------------------------ + # Concepts + # ------------------------------------------------------------------ + + @staticmethod + def _convert_concepts( + ontology: OntologyComponent, + semantic_model: SemanticModel, + palantir_ontology: PalantirOntology, + concept_mappings: list[ConceptMapping], + db_name: str, + schema_name: str, + ) -> None: + subtype_relations = palantir_ontology.subtypes_relations() + + nodes = [ot.guid() for ot in palantir_ontology.object_types().values()] + edges: list[tuple[str, str]] = [] + edge_to_relation_guid: dict[tuple[str, str], str] = {} + for child, rel in subtype_relations.items(): + parent = rel.many_object_type() + if child == parent: + continue + edge = (parent.guid(), child.guid()) + edges.append(edge) + edge_to_relation_guid[edge] = rel.guid() + + order, removed_edges = topological_sort_break_cycles(nodes, edges) + # Subtype edges that would form cycles get dropped by the topo sort — + # treat them as ignored inheritance below. + ignore_subtype_relation_ids = {edge_to_relation_guid[e] for e in removed_edges} + + for ot_guid in order: + ot = palantir_ontology.object_types()[ot_guid] + if ot.active() or ot.endorsed() or ot.intermediary(): + PalantirToOsiConverter._convert_object_type( + ontology, + semantic_model, + ot, + subtype_relations, + ignore_subtype_relation_ids, + concept_mappings, + db_name, + schema_name, + ) + + @staticmethod + def _convert_object_type( + ontology: OntologyComponent, + semantic_model: SemanticModel, + ot: ObjectType, + subtype_relations: dict[ObjectType, ManyToOneRelation], + ignore_subtype_relation_ids: set[str], + concept_mappings: list[ConceptMapping], + db_name: str, + schema_name: str, + ) -> None: + concept_name = PalantirToOsiConverter._concept_name(ot) + relevant_props = [ + p for p in ot.properties().values() if p.active() or p.experimental() or p.intermediary() + ] + concept: Concept | None = None + + if ontology.lookup_concept(concept_name) is None: + is_subtype = ot in subtype_relations + subtype_relation = subtype_relations.get(ot) + ignore_subtype = bool( + subtype_relation and subtype_relation.guid() in ignore_subtype_relation_ids + ) + + if is_subtype and not ignore_subtype: + parent_ot = subtype_relation.many_object_type() # type: ignore[union-attr] + parent_name = PalantirToOsiConverter._concept_name(parent_ot) + parent = ontology.lookup_concept(parent_name) + assert parent is not None, f"Parent concept '{parent_name}' not found (expected from topological order)" + concept = Concept(name=concept_name, type=ConceptType.ENTITY_TYPE, extends=[parent]) + else: + concept = Concept(name=concept_name, type=ConceptType.ENTITY_TYPE) + ontology.add_concept(concept) + + for prop in relevant_props: + PalantirToOsiConverter._convert_property(ontology, concept, prop) + + if not is_subtype or ignore_subtype: + identifiers: dict[str, Relationship] = {} + for prop in ot.primary_keys(): + prop_name = PalantirToOsiConverter._attribute_name(prop) + rel = ontology.lookup_concept_relationship(concept, prop_name) + if rel is None: + raise ValueError( + f"Identifier relationship '{concept_name}.{prop_name}' not found " + f"while wiring primary keys for ObjectType '{ot.name()}'." + ) + identifiers[rel.full_name] = rel + concept.set_identify_by(identifiers) + # Set multiplicities now that we know which relationship is the sole identifier. + # A non-composite identifier is OneToOne; all others stay ManyToOne. + sole = next(iter(identifiers.values())) if len(identifiers) == 1 else None + for prop in relevant_props: + prop_name = PalantirToOsiConverter._attribute_name(prop) + prop_rel = ontology.lookup_concept_relationship(concept, prop_name) + if prop_rel is not None: + mult = RelationshipMultiplicity.ONE_TO_ONE if prop_rel is sole else RelationshipMultiplicity.MANY_TO_ONE + prop_rel.set_multiplicity(mult) + else: + concept = ontology.lookup_concept(concept_name) + assert concept is not None + # Re-encountered concept (multiple datasets feeding the same OT). + # Verify every relevant property already has its relationship — + # otherwise the second dataset is contributing fields the first + # didn't declare, which produces an asymmetric model. + for prop in relevant_props: + prop_name = PalantirToOsiConverter._attribute_name(prop) + if ontology.lookup_concept_relationship(concept, prop_name) is None: + raise ValueError( + f"Concept '{concept_name}' refers to multiple datasets but not all " + f"contain the '{prop_name}' property." + ) + + PalantirToOsiConverter._convert_mappings( + ontology, semantic_model, ot, subtype_relations, concept, concept_mappings, db_name, schema_name + ) + + @staticmethod + def _convert_property(ontology: OntologyComponent, concept: Concept, prop: PalantirProperty) -> None: + def madlib_decl(c: Concept, p: PalantirProperty) -> str: + return ( + f"{{{c}}} {p.readable_id()} " + f"{PalantirToOsiConverter._type_to_madlib_suffix(p.type())}" + ) + + prop_name = PalantirToOsiConverter._attribute_name(prop) + if ontology.lookup_concept_relationship(concept, prop_name) is not None: + return + + relates: list[tuple[Concept, str | None]] = [] + relates = PalantirToOsiConverter._convert_property_type_roles(ontology, relates, prop.type()) + + ontology.add_relationship(Relationship( + name=prop_name, + container=concept, + relates=relates, + verbalizes=[madlib_decl(concept, prop)], + )) + + # ------------------------------------------------------------------ + # Mappings: ConceptMapping per (concept, dataset) + # ------------------------------------------------------------------ + + @staticmethod + def _convert_mappings( + ontology: OntologyComponent, + semantic_model: SemanticModel, + ot: ObjectType, + subtype_relations: dict[ObjectType, ManyToOneRelation], + concept: Concept, + concept_mappings: list[ConceptMapping], + db_name: str, + schema_name: str, + ) -> None: + if not ot._syncs_from: + return + + parent_concept: Concept | None = None + subtype_relation = subtype_relations.get(ot) + + if subtype_relation is not None: + parent_ot = subtype_relation.many_object_type() + parent_concept = ontology.lookup_concept( + PalantirToOsiConverter._concept_name(parent_ot) + ) + property_map = subtype_relation.property_map() + identifier_props = list(parent_ot.primary_keys()) + + def resolve(p: PalantirProperty) -> PalantirProperty: + return property_map[p] + else: + identifier_props = list(ot.primary_keys()) + + def resolve(p: PalantirProperty) -> PalantirProperty: + return p + + for palantir_ds in ot.syncs_from(): + dataset = PalantirToOsiConverter._convert_dataset( + semantic_model, ontology, ot, palantir_ds, db_name, schema_name + ) + + # Build referent_mappings that locate `concept` instances by + # walking the (effective) identifying relationships against this + # dataset's columns. + id_referents: list[ReferentMapping] = [] + for prop in identifier_props: + prop_name = PalantirToOsiConverter._attribute_name(prop) + # For subtypes, identifying relationships live on the parent + # concept; the child reaches them via `lookup_concept_relationship`. + rel = ontology.lookup_concept_relationship(concept, prop_name) + if rel is None: + continue + field = PalantirToOsiConverter._get_dataset_field_by_palantir_property( + resolve(prop), palantir_ds, dataset + ) + if field is None: + continue + id_referents.append(ReferentMapping(relationship=rel, expression=field)) + + cm = ConceptMapping(concept=concept) + + # object_mappings: how to construct/identify this concept's + # instances from this dataset. Always uses referent_mappings to + # walk the identifying relationships (whether own or inherited). + cm.object_mappings.append( + ObjectMapping( + concept=parent_concept, + referent_mappings=list(id_referents) if id_referents else None, + ) + ) + + # link_mappings: the root identifies the source object (same as + # object_mapping), children populate each property relationship. + children: list[LinkMapping] = [] + primary_keys = set(ot.primary_keys()) + for prop in ot.properties().values(): + if not (prop.active() or prop.intermediary()): + continue + if prop in primary_keys: + continue + if not prop.pk_mapping() and prop.datasource_resource_id() != palantir_ds.guid(): + continue + if isinstance(prop.type(), ArrayDataType): + warnings.warn( + f"Skipping property '{prop.readable_id()}'. Array datatype is not supported" + ) + continue + + prop_name = PalantirToOsiConverter._attribute_name(prop) + relationship = ontology.lookup_concept_relationship(concept, prop_name) + if relationship is None: + continue + field = PalantirToOsiConverter._get_dataset_field_by_palantir_property( + prop, palantir_ds, dataset + ) + if field is None: + continue + value_concept = relationship.last_role.player + children.append( + LinkMapping( + object_mapping=ObjectMapping(concept=value_concept,expression=field), + relationship=relationship, + ) + ) + + if id_referents or children: + cm.link_mappings.append( + LinkMapping( + object_mapping=ObjectMapping( + concept=parent_concept, + referent_mappings=list(id_referents) if id_referents else None, + ), + children=children if children else None, + ) + ) + + concept_mappings.append(cm) + + # ------------------------------------------------------------------ + # Relations (M:1, M:M, intermediary) + # ------------------------------------------------------------------ + + @staticmethod + def _convert_relationships( + ontology: OntologyComponent, + palantir_ontology: PalantirOntology, + concept_mappings: list[ConceptMapping], + semantic_model: SemanticModel, + ) -> None: + for rel in palantir_ontology.relations().values(): + if rel.active() or rel.intermediary(): + PalantirToOsiConverter._convert_relation(ontology, rel, concept_mappings, semantic_model) + elif ( + isinstance(rel, ManyToOneRelation) + and rel.experimental() + and rel.one_object_type().active() + and rel.many_object_type().active() + ): + PalantirToOsiConverter._convert_relation(ontology, rel, concept_mappings, semantic_model) + + for ir in palantir_ontology.intermediary_relations().values(): + if ir.active() or ir.intermediary(): + PalantirToOsiConverter._convert_intermediary_relation(ontology, palantir_ontology, ir) + elif ( + ir.experimental() + and ir.role_a_player().active() + and ir.role_b_player().active() + and ir.intermediary_player().active() + ): + PalantirToOsiConverter._convert_intermediary_relation(ontology, palantir_ontology, ir) + + @staticmethod + def _convert_relation( + ontology: OntologyComponent, + relation: Relation, + concept_mappings: list[ConceptMapping], + semantic_model: SemanticModel, + ) -> None: + if isinstance(relation, ManyToOneRelation): + PalantirToOsiConverter._convert_many_to_one( + ontology, relation, concept_mappings, semantic_model + ) + elif isinstance(relation, ManyToManyRelation): + PalantirToOsiConverter._convert_many_to_many(ontology, relation) + + @staticmethod + def _convert_many_to_one( + ontology: OntologyComponent, + rel: ManyToOneRelation, + concept_mappings: list[ConceptMapping], + semantic_model: SemanticModel, + ) -> None: + mot = rel.many_object_type() + mot_name = PalantirToOsiConverter._concept_name(mot) + mot_concept = ontology.lookup_concept(mot_name) + oot = rel.one_object_type() + oot_name = PalantirToOsiConverter._concept_name(oot) + oot_concept = ontology.lookup_concept(oot_name) + if mot_concept is None or oot_concept is None: + return + prop_name = PalantirToOsiConverter._attribute_name(rel) + + if mot_concept is oot_concept: + verbalize = f"{{{mot_concept}}} {prop_name} {{{oot_concept}:snd}}" + relates: list[tuple[Concept, str | None]] = [(oot_concept, "snd")] + else: + verbalize = f"{{{mot_concept}}} {prop_name} {{{oot_concept}}}" + relates = [(oot_concept, None)] + + relationship = Relationship( + name=prop_name, + container=mot_concept, + relates=relates, + verbalizes=[verbalize], + multiplicity=RelationshipMultiplicity.MANY_TO_ONE, + ) + ontology.add_relationship(relationship) + + if mot._syncs_from: + PalantirToOsiConverter._attach_link_to_concept_mappings( + ontology, rel, relationship, mot, mot_concept, oot_concept, concept_mappings, semantic_model + ) + else: + # No many-side datasets: fall back to a derived_by formula that + # equates FK columns. + frags = [ + f"{relationship.first_role.name}.{PalantirToOsiConverter._attribute_name(mprop)}" + f" == {relationship.last_role.name}.{PalantirToOsiConverter._attribute_name(oprop)}" + for mprop, oprop in rel.property_map().items() + ] + if frags: + formula = Formula(raw_expr=" AND ".join(frags), parent=relationship) + relationship.add_derived_by(formula) + ontology.add_rule(formula) + + @staticmethod + def _attach_link_to_concept_mappings( + ontology: OntologyComponent, + rel: ManyToOneRelation, + relationship: Relationship, + mot: ObjectType, + mot_concept: Concept, + oot_concept: Concept, + concept_mappings: list[ConceptMapping], + semantic_model: SemanticModel, + ) -> None: + """For each (mot_concept, dataset) ConceptMapping, append a link_mapping + child that walks the target concept's identifying relationships through + the source's FK columns.""" + property_map = rel.property_map() + if not property_map: + return + + # Resolve target (oot) identifying relationships once. + target_id_rels: list[tuple[Relationship, PalantirProperty]] = [] + for mprop, oprop in property_map.items(): + oot_attr = PalantirToOsiConverter._attribute_name(oprop) + id_rel = ontology.lookup_concept_relationship(oot_concept, oot_attr) + if id_rel is None: + return + target_id_rels.append((id_rel, mprop)) + + for palantir_ds in mot.syncs_from(): + ds_name = ( + f"{PalantirToOsiConverter._concept_name(mot)}_{palantir_ds.readable_id()}" + ) + dataset = semantic_model.lookup_dataset(ds_name) + if dataset is None: + continue + + cm = PalantirToOsiConverter._find_concept_mapping(concept_mappings, mot_concept, dataset) + if cm is None: + warnings.warn( + f"No ConceptMapping for entity '{mot_concept.name}' and dataset " + f"'{ds_name}'; cannot attach link '{relationship.full_name}'" + ) + continue + + # Build referent_mappings that look up the target via FK columns. + referents: list[ReferentMapping] = [] + resolved = True + for id_rel, mprop in target_id_rels: + fk_field = PalantirToOsiConverter._get_dataset_field_by_palantir_property( + mprop, palantir_ds, dataset + ) + if fk_field is None: + resolved = False + break + referents.append(ReferentMapping(relationship=id_rel, expression=fk_field)) + if not resolved: + continue + + child = LinkMapping( + object_mapping=ObjectMapping(concept=oot_concept, referent_mappings=referents), + relationship=relationship, + ) + # Attach as a child on the root link_mapping (the identifying tree). + if cm.link_mappings: + root = cm.link_mappings[0] + if root.children is None: + root.children = [] + root.children.append(child) + else: + if not cm.object_mappings: + raise ValueError( + f"Cannot attach link '{relationship.full_name}': concept " + f"'{mot_concept.name}' has no identifying object mapping " + f"to use as the link root." + ) + root_om = cm.object_mappings[0] + cm.link_mappings.append(LinkMapping( + object_mapping=ObjectMapping( + concept=root_om.concept, + referent_mappings=root_om.referent_mappings, + ), + children=[child], + )) + + @staticmethod + def _find_concept_mapping( + concept_mappings: list[ConceptMapping], + concept: Concept, + dataset: Dataset, + ) -> ConceptMapping | None: + """Resolve the ConceptMapping built for this (concept, dataset). + + When multiple datasets feed the same concept we get one ConceptMapping + per dataset; pick the one whose referent expressions reference + `dataset`, falling back to the first candidate.""" + candidates = [cm for cm in concept_mappings if cm.concept is concept] + if len(candidates) <= 1: + return candidates[0] if candidates else None + return next( + (cm for cm in candidates if PalantirToOsiConverter._references_dataset(cm, dataset)), + candidates[0], + ) + + @staticmethod + def _references_dataset(cm: ConceptMapping, dataset: Dataset) -> bool: + """True iff any referent expression in `cm` points to a field of `dataset`.""" + return any( + isinstance(rm.expression, DatasetField) and rm.expression.dataset is dataset + for om in cm.object_mappings + for rm in (om.referent_mappings or []) + ) + + @staticmethod + def _convert_many_to_many(ontology: OntologyComponent, rel: ManyToManyRelation) -> None: + aot = rel.role_a_player() + aot_concept = ontology.lookup_concept(PalantirToOsiConverter._concept_name(aot)) + bot = rel.role_b_player() + bot_concept = ontology.lookup_concept(PalantirToOsiConverter._concept_name(bot)) + if aot_concept is None or bot_concept is None: + return + rel_name = PalantirToOsiConverter._attribute_name(rel) + + if aot_concept is bot_concept: + verbalize = f"{{{aot_concept}}} {rel_name} {{{bot_concept}:snd}}" + relates = [(bot_concept, "snd")] + else: + verbalize = f"{{{aot_concept}}} {rel_name} {{{bot_concept}}}" + relates = [(bot_concept, None)] + + relationship = Relationship( + name=rel_name, + container=aot_concept, + relates=relates, + verbalizes=[verbalize], + multiplicity=None, + ) + ontology.add_relationship(relationship) + + @staticmethod + def _convert_intermediary_relation( + ontology: OntologyComponent, + palantir_ontology: PalantirOntology, + rel: IntermediaryRelation, + ) -> None: + aot = rel.role_a_player() + aot_name = PalantirToOsiConverter._concept_name(aot) + aot_concept = ontology.lookup_concept(aot_name) + bot = rel.role_b_player() + bot_name = PalantirToOsiConverter._concept_name(bot) + bot_concept = ontology.lookup_concept(bot_name) + if aot_concept is None or bot_concept is None: + return + rel_name = PalantirToOsiConverter._attribute_name(rel) + + if aot_concept is bot_concept: + verbalize = f"{{{aot_concept}}} {rel_name} {{{bot_concept}:snd}}" + relates: list[tuple[Concept, str | None]] = [(bot_concept, "snd")] + else: + verbalize = f"{{{aot_concept}}} {rel_name} {{{bot_concept}}}" + relates = [(bot_concept, None)] + + relationship = Relationship( + name=rel_name, + container=aot_concept, + relates=relates, + verbalizes=[verbalize], + ) + ontology.add_relationship(relationship) + + rel_a = palantir_ontology.relations()[rel.relation_a()] + rel_a_name = PalantirToOsiConverter._attribute_name(rel_a) + rel_b = palantir_ontology.relations()[rel.relation_b()] + rel_b_name = PalantirToOsiConverter._attribute_name(rel_b) + + fp_a = PalantirToOsiConverter._concept_name( + rel_a.many_object_type() if isinstance(rel_a, ManyToOneRelation) else rel_a.role_a_player() + ) + sp_a = PalantirToOsiConverter._concept_name( + rel_a.one_object_type() if isinstance(rel_a, ManyToOneRelation) else rel_a.role_b_player() + ) + fp_b = PalantirToOsiConverter._concept_name( + rel_b.many_object_type() if isinstance(rel_b, ManyToOneRelation) else rel_b.role_a_player() + ) + sp_b = PalantirToOsiConverter._concept_name( + rel_b.one_object_type() if isinstance(rel_b, ManyToOneRelation) else rel_b.role_b_player() + ) + + assert (aot_name == fp_a and bot_name == fp_b) or ( + aot_name == sp_a and bot_name == sp_b + ), f"Invalid intermediary relation '{rel_name}' arguments." + + join_condition = ( + f"{fp_a}.{rel_a_name}({relationship.first_role.name}) AND " + f"{fp_b}.{rel_b_name}({relationship.last_role.name})" + ) + formula = Formula(raw_expr=join_condition, parent=relationship) + relationship.add_derived_by(formula) + ontology.add_rule(formula) + + # ------------------------------------------------------------------ + # Datasets + # ------------------------------------------------------------------ + + @staticmethod + def _convert_dataset( + semantic_model: SemanticModel, + ontology: OntologyComponent, + ot: ObjectType, + palantir_ds: PalantirDataSet, + db_name: str, + schema_name: str, + ) -> Dataset: + ds_name = f"{PalantirToOsiConverter._concept_name(ot)}_{palantir_ds.readable_id()}" + existing = semantic_model.lookup_dataset(ds_name) + if existing is not None: + return existing + + fields: list[DatasetField] = [] + for column in palantir_ds.columns(): + if column.type().upper() == "ARRAY": + continue + field_name = PalantirToOsiConverter._normalize_field_name(column.name()) + fields.append( + DatasetField( + name=field_name, + expression=DialectExpressionSet( + dialects=[ + DialectExpression(dialect=_DEFAULT_DIALECT, expression=field_name) + ] + ), + type=PalantirToOsiConverter._resolve_field_type(ontology, palantir_ds, column), + ) + ) + + dataset = Dataset( + name=ds_name, + source=f"{db_name}.{schema_name}.{palantir_ds.readable_id()}", + fields=fields, + description=palantir_ds.description(), + ) + semantic_model.add_dataset(dataset) + return dataset + + @staticmethod + def _resolve_field_type( + ontology: OntologyComponent, palantir_ds: PalantirDataSet, column: DataSetColumn + ) -> Concept: + type_str = ( + DataType.parse_datatype(column.type()).to_type() if column.type() else "String" + ) + concept = ontology.lookup_concept(type_str) + if not concept: + raise ValueError( + f"Concept '{type_str}' is not defined in the ontology but used in the " + f"DatasetField '{palantir_ds.readable_id()}.{column.name()}'." + ) + return concept + + # ------------------------------------------------------------------ + # Naming / typing helpers + # ------------------------------------------------------------------ + + @staticmethod + def _attribute_name(prop: PalantirProperty | Relation) -> str: + return to_verbalization_string(prop.readable_id()) + + @staticmethod + def _concept_name(ot: ObjectType) -> str: + return to_pascal_case(ot.name()) + + @staticmethod + def _type_to_madlib_suffix(type_, arr_depth: int = 1) -> str: + if isinstance(type_, ArrayDataType): + depth = arr_depth + return ( + f"{{Integer:{PalantirToOsiConverter._depth_role_name(depth)}}} maps to " + f"{PalantirToOsiConverter._type_to_madlib_suffix(type_.base_type(), depth + 1)}" + ) + return f"{{{type_.to_type()}}}" + + @staticmethod + def _convert_property_type_roles( + ontology: OntologyComponent, roles: list[tuple[Concept, str | None]], type_, arr_depth: int = 1 + ) -> list[tuple[Concept, str | None]]: + if isinstance(type_, ArrayDataType): + integer = ontology.lookup_concept("Integer") + if integer is None: + raise ValueError("Builtin 'Integer' could not be resolved for array role.") + roles.append((integer, PalantirToOsiConverter._depth_role_name(arr_depth))) + PalantirToOsiConverter._convert_property_type_roles( + ontology, roles, type_.base_type(), arr_depth + 1 + ) + else: + target = ontology.lookup_concept(type_.to_type()) + if target is None: + raise ValueError( + f"Type concept '{type_.to_type()}' is not defined in the ontology." + ) + roles.append((target, None)) + return roles + + @staticmethod + def _depth_role_name(depth: int) -> str: + name = PalantirToOsiConverter.depths_role_names.get(depth) + if not name: + raise Exception(f"Array types of depth {depth} are not supported") + return name + + @staticmethod + def _get_dataset_field_by_palantir_property( + prop: PalantirProperty, palantir_ds: PalantirDataSet, dataset: Dataset + ) -> DatasetField | None: + column_name = prop.column_name() + pk_mapping = prop.pk_mapping() + ds_guid = palantir_ds.guid() + if pk_mapping: + if ds_guid not in pk_mapping: + raise ValueError( + f"Primary key mapping for Palantir DataSet '{palantir_ds.readable_id()}' " + f"is missing property '{PalantirToOsiConverter._attribute_name(prop)}'" + ) + column_name = pk_mapping[ds_guid] + if not column_name: + return None + field = dataset.field(PalantirToOsiConverter._normalize_field_name(column_name)) + if not field: + warnings.warn(f"Dataset '{dataset.name}' does not contain a field named '{column_name}'") + return field + + @staticmethod + def _normalize_field_name(name: str) -> str: + normalized = name.replace("-", "_") + if normalized and normalized[0].isdigit(): + normalized = f"_{normalized}" + return normalized diff --git a/converters/ontology/src/osi/converter/spec_to_osi/__init__.py b/converters/ontology/src/osi/converter/spec_to_osi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/converter/spec_to_osi/converter.py b/converters/ontology/src/osi/converter/spec_to_osi/converter.py new file mode 100644 index 0000000..a11a39c --- /dev/null +++ b/converters/ontology/src/osi/converter/spec_to_osi/converter.py @@ -0,0 +1,499 @@ +"""Converter from OsiSpec (Pydantic DTOs) to OsiOntology (runtime semantic model).""" + +from __future__ import annotations + +import re + +from osi.common.graph import topological_sort +from osi.model import ( + Concept, + ConceptMapping, + ConceptType, + CustomExtension, + Dataset, + DatasetField, + DialectExpression, + DialectExpressionSet, + Dimension, + Formula, + JoinPath, + LinkMapping, + SemanticModel, + Metric, + ObjectMapping, + OntologyComponent, + OntologyMapping, + ReferentMapping, + Relationship, + RelationshipMultiplicity, + OsiOntology +) +from osi.spec import ( + Concept as SpecConcept, + ConceptMapping as SpecConceptMapping, + CustomExtension as SpecCustomExtension, + Dataset as SpecDataset, + DatasetField as SpecDatasetField, + DialectExpression as SpecDialectExpression, + Dimension as SpecDimension, + Expression as SpecExpression, + JoinPath as SpecJoinPath, + LinkMapping as SpecLinkMapping, + SemanticModel as SpecSemanticModel, + Metric as SpecMetric, + ObjectMapping as SpecObjectMapping, + OntologyMapping as SpecOntologyMapping, + OsiSpec, + ReferentMapping as SpecReferentMapping, + Relationship as SpecRelationship, +) +Container = Concept | Relationship + +# A mapping expression is treated as a single field reference when it matches +# `DATASET.field` or a bare `field` identifier — no parsing, just a pattern check. +_QUALIFIED_FIELD_RE = re.compile(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*\.\s*([A-Za-z_][A-Za-z0-9_]*)\s*$") +_BARE_FIELD_RE = re.compile(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*$") + + +class SpecToOsiConverter: + """Top-level converter. Use `convert(spec)` to obtain a OsiOntology.""" + + @staticmethod + def convert(spec: OsiSpec) -> OsiOntology: + ontology = OntologyComponent() + model = OsiOntology( + name=spec.name, + ontology=ontology, + description=spec.description, + ai_context=spec.ai_context, + version=spec.version, + ) + + SpecToOsiConverter._populate_ontology(ontology, spec) + + for om_spec in spec.ontology_mappings: + SpecToOsiConverter._convert_ontology_mapping(model, om_spec) + + return model + + # ----- Ontology ------------------------------------------------------ + + @staticmethod + def _populate_ontology(ontology: OntologyComponent, spec: OsiSpec) -> None: + + concept_specs = {concept_component.concept.name: concept_component.concept for concept_component in spec.ontology} + sorted_names = SpecToOsiConverter._sort_spec_dependency_graph(list(concept_specs.values())) + for name in sorted_names: + concept_spec = concept_specs[name] + extends: list[Concept] = [] + if concept_spec.extends: + for ext in concept_spec.extends: + parent = ontology.lookup_concept(ext) + if not parent: + raise ValueError( + f"Subtype '{ext}' is not declared in ontology '{spec.name}'." + ) + extends.append(parent) + ontology.add_concept( + Concept( + name=concept_spec.name, + type=ConceptType.from_value(concept_spec.type), + description=concept_spec.description, + extends=extends, + ) + ) + + for concept_component in spec.ontology: + container = ontology.lookup_concept(concept_component.concept.name) + if container is None: + raise ValueError(f"Internal: container concept '{concept_component.concept.name}' not found") + for rel_spec in concept_component.relationships: + SpecToOsiConverter._convert_relationship(ontology, container, rel_spec) + + # Identifiers: now that all relationships exist, resolve identify_by. + for concept_component in spec.ontology: + concept_spec = concept_component.concept + concept = ontology.lookup_concept(concept_spec.name) + if concept is None: + continue + identifiers: dict[str, Relationship] = {} + for ref_name in concept_spec.identify_by: + rel = ontology.lookup_concept_relationship(concept, ref_name) + if rel is None: + raise ValueError( + f"identify_by '{ref_name}' on concept '{concept.name}' refers to an " + f"unknown relationship in ontology '{spec.name}'." + ) + identifiers[rel.full_name] = rel + concept.set_identify_by(identifiers) + + # Formulas: derived_by + requires (after concepts/relationships exist). + for concept_component in spec.ontology: + concept_spec = concept_component.concept + concept = ontology.lookup_concept(concept_spec.name) + if concept is None: + continue + for raw in concept_spec.requires: + req = _build_rule(raw, concept) + if req: + concept.add_require(req) + ontology.add_require(req) + for raw in concept_spec.derived_by: + rule = _build_rule(raw, concept) + if rule: + concept.add_derived_by(rule) + ontology.add_rule(rule) + for rel_spec in concept_component.relationships: + rel = ontology.lookup_concept_relationship(concept, rel_spec.name) + if rel is None: + continue + for raw in rel_spec.requires: + req = _build_rule(raw, rel) + if req: + rel.add_require(req) + ontology.add_require(req) + for raw in rel_spec.derived_by: + rule = _build_rule(raw, rel) + if rule: + rel.add_derived_by(rule) + ontology.add_rule(rule) + + @staticmethod + def _convert_relationship( + ontology: OntologyComponent, container: Concept, rel_spec: SpecRelationship + ) -> None: + relates: list[tuple[Concept, str | None]] = [] + for role_spec in rel_spec.roles: + role_concept = ontology.lookup_concept(role_spec.concept) + if role_concept is None: + raise ValueError( + f"Role concept '{role_spec.concept}' in relationship '{container.name}.{rel_spec.name}' " + f"is not declared in the ontology." + ) + relates.append((role_concept, role_spec.name)) + + multiplicity = RelationshipMultiplicity.from_value(rel_spec.multiplicity) + relationship = Relationship( + name=rel_spec.name, + container=container, + relates=relates, + description=rel_spec.description, + verbalizes=list(rel_spec.verbalizes) if rel_spec.verbalizes else None, + multiplicity=multiplicity, + ) + ontology.add_relationship(relationship) + + # ----- Logical model ------------------------------------------------- + + @staticmethod + def _convert_semantic_model(lm_spec: SpecSemanticModel) -> SemanticModel: + semantic_model = SemanticModel( + name=lm_spec.name, + description=lm_spec.description, + ai_context=lm_spec.ai_context, + custom_extensions=[ + _convert_custom_extension(ce) for ce in lm_spec.custom_extensions + ], + ) + for ds_spec in lm_spec.datasets: + semantic_model.add_dataset(_convert_dataset(ds_spec)) + for jp_spec in lm_spec.relationships: + semantic_model.add_join_path(_convert_join_path(jp_spec, semantic_model)) + for m_spec in lm_spec.metrics: + semantic_model.add_metric(_convert_metric(m_spec)) + return semantic_model + + # ----- Ontology mapping --------------------------------------------- + + @staticmethod + def _convert_ontology_mapping(model: OsiOntology, om_spec: SpecOntologyMapping) -> None: + ontology = model.ontology + + semantic_model = SpecToOsiConverter._convert_semantic_model(om_spec.semantic_model) + + mapping = OntologyMapping( + name=om_spec.name, + ontology=ontology, + semantic_model=semantic_model, + description=om_spec.description, + ) + model.add_ontology_mapping(mapping) + + for cm_spec in om_spec.concept_mappings: + mapping.add_concept_mapping( + SpecToOsiConverter._convert_concept_mapping( + model, ontology, semantic_model, cm_spec + ) + ) + + @staticmethod + def _convert_concept_mapping( + model: OsiOntology, + ontology: OntologyComponent, + semantic_model: SemanticModel, + cm_spec: SpecConceptMapping, + ) -> ConceptMapping: + concept = ontology.lookup_concept(cm_spec.concept) + if concept is None: + raise ValueError( + f"ConceptMapping references unknown concept '{cm_spec.concept}' in ontology '{model.name}'." + ) + cm = ConceptMapping(concept=concept) + for object_mapping_spec in cm_spec.object_mappings: + cm.object_mappings.append( + SpecToOsiConverter._convert_object_mapping( + model, ontology, semantic_model, concept, object_mapping_spec + ) + ) + for link_mapping_spec in cm_spec.link_mappings: + cm.link_mappings.append( + SpecToOsiConverter._convert_link_mapping( + model, ontology, semantic_model, concept, link_mapping_spec + ) + ) + return cm + + @staticmethod + def _convert_object_mapping( + model: OsiOntology, + ontology: OntologyComponent, + semantic_model: SemanticModel, + container: Concept, + om_spec: SpecObjectMapping, + ) -> ObjectMapping: + concept: Concept | None = None + if om_spec.concept: + concept = ontology.lookup_concept(om_spec.concept) + if concept is None: + raise ValueError( + f"ObjectMapping references unknown concept '{om_spec.concept}' in ontology " + f"'{model.name}'." + ) + expression: DatasetField | Formula | None = None + if om_spec.expression is not None: + expression = _resolve_mapping_expression(om_spec.expression, semantic_model, concept) + referent_mappings = None + if om_spec.referent_mappings is not None: + rm_container = concept if concept is not None else container + referent_mappings = [ + SpecToOsiConverter._convert_referent_mapping( + model, ontology, semantic_model, rm_container, rm + ) + for rm in om_spec.referent_mappings + ] + return ObjectMapping(concept=concept, expression=expression, referent_mappings=referent_mappings) + + @staticmethod + def _convert_referent_mapping( + model: OsiOntology, + ontology: OntologyComponent, + semantic_model: SemanticModel, + container: Concept, + rm_spec: SpecReferentMapping, + ) -> ReferentMapping: + rel = ontology.lookup_concept_relationship(container, rm_spec.relationship) + if rel is None: + raise ValueError( + f"ReferentMapping references unknown relationship " + f"'{container.name}.{rm_spec.relationship}' in ontology '{model.name}'." + ) + sibling_player = rel.last_role.player + expression: DatasetField | Formula | None = None + if rm_spec.expression is not None: + expression = _resolve_mapping_expression(rm_spec.expression, semantic_model, sibling_player) + nested = None + if rm_spec.referent_mappings is not None: + nested = [ + SpecToOsiConverter._convert_referent_mapping( + model, ontology, semantic_model, sibling_player, child + ) + for child in rm_spec.referent_mappings + ] + return ReferentMapping(relationship=rel, expression=expression, referent_mappings=nested) + + @staticmethod + def _convert_link_mapping( + model: OsiOntology, + ontology: OntologyComponent, + semantic_model: SemanticModel, + container: Concept, + lm_spec: SpecLinkMapping, + ) -> LinkMapping: + object_mapping = SpecToOsiConverter._convert_object_mapping( + model, ontology, semantic_model, container, lm_spec.object_mapping + ) + relationship: Relationship | None = None + if lm_spec.relationship is not None: + relationship = ontology.lookup_concept_relationship(container, lm_spec.relationship) + if relationship is None: + raise ValueError( + f"LinkMapping references unknown relationship " + f"'{container.name}.{lm_spec.relationship}' in ontology '{model.name}'." + ) + children: list[LinkMapping] | None = None + if lm_spec.children is not None: + child_container = relationship.last_role.player if relationship is not None else container + children = [ + SpecToOsiConverter._convert_link_mapping( + model, ontology, semantic_model, child_container, child + ) + for child in lm_spec.children + ] + return LinkMapping(object_mapping=object_mapping, relationship=relationship, children=children) + + # ----- helpers ------------------------------------------------------- + + @staticmethod + def _sort_spec_dependency_graph(concepts: list[SpecConcept]) -> list[str]: + nodes = [concept.name for concept in concepts] + edges: list[tuple[str, str]] = [] + for concept in concepts: + if concept.extends: + for ext in concept.extends: + edges.append((ext, concept.name)) + return topological_sort(nodes, edges) + + +# --------------------------------------------------------------------------- +# Module-level helpers (logical-model + custom extensions) +# --------------------------------------------------------------------------- + +def _build_rule(raw: str | None, parent: Container) -> Formula | None: + if not raw: + return None + return Formula(raw_expr=raw, parent=parent) + + +def _resolve_mapping_expression( + expression: str, semantic_model: SemanticModel, expected_type: Concept | None +) -> DatasetField | Formula: + """Map a raw spec expression onto either a DatasetField (single + `DATASET.field` or bare `field` reference) or a Formula (anything else). + """ + qualified = _QUALIFIED_FIELD_RE.match(expression) + if qualified: + ds_name, field_name = qualified.group(1), qualified.group(2) + dataset = semantic_model.lookup_dataset(ds_name) + if dataset is not None: + field = dataset.field(field_name) + if field is not None: + _pin_field_type(field, expected_type) + return field + return Formula(raw_expr=expression) + + bare = _BARE_FIELD_RE.match(expression) + if bare: + field_name = bare.group(1) + for dataset in semantic_model.datasets: + field = dataset.field(field_name) + if field is not None: + _pin_field_type(field, expected_type) + return field + return Formula(raw_expr=expression) + + return Formula(raw_expr=expression) + + +def _pin_field_type(field: DatasetField, expected_type: Concept | None) -> None: + if expected_type is None: + return + if field.type is None: + field.type = expected_type + return + if field.type is not expected_type: + raise ValueError( + f"Field '{field.name}' is already mapped as concept " + f"'{field.type.name}' but this mapping expects " + f"'{expected_type.name}'. A dataset field can only be " + f"bound to one ontology concept type." + ) + + +def _convert_custom_extension(ce: SpecCustomExtension) -> CustomExtension: + return CustomExtension(vendor_name=ce.vendor_name, data=ce.data) + + +def _convert_expression(expr: SpecExpression) -> DialectExpressionSet: + return DialectExpressionSet( + dialects=[_convert_dialect_expression(d) for d in expr.dialects] + ) + + +def _convert_dialect_expression(dialect_expr: SpecDialectExpression) -> DialectExpression: + return DialectExpression(dialect=dialect_expr.dialect, expression=dialect_expr.expression) + + +def _convert_dimension(dim: SpecDimension | None) -> Dimension | None: + if dim is None: + return None + return Dimension(is_time=dim.is_time) + + +def _convert_dataset_field(fl: SpecDatasetField) -> DatasetField: + return DatasetField( + name=fl.name, + expression=_convert_expression(fl.expression), + dimension=_convert_dimension(fl.dimension), + label=fl.label, + description=fl.description, + ai_context=fl.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in fl.custom_extensions], + ) + + +def _convert_dataset(ds: SpecDataset) -> Dataset: + fields = [_convert_dataset_field(fl) for fl in ds.fields] + return Dataset( + name=ds.name, + source=ds.source, + fields=fields, + primary_key=ds.primary_key, + unique_keys=ds.unique_keys, + description=ds.description, + ai_context=ds.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in ds.custom_extensions], + ) + + +def _convert_join_path(jp: SpecJoinPath, lm: SemanticModel) -> JoinPath: + from_dataset = lm.lookup_dataset(jp.from_) + to_dataset = lm.lookup_dataset(jp.to) + if from_dataset is None: + raise ValueError(f"JoinPath '{jp.name}': unknown 'from' dataset '{jp.from_}'.") + if to_dataset is None: + raise ValueError(f"JoinPath '{jp.name}': unknown 'to' dataset '{jp.to}'.") + from_columns: list[DatasetField] = [] + for col in jp.from_columns: + field = from_dataset.field(col) + if field is None: + raise ValueError( + f"JoinPath '{jp.name}': column '{col}' not found in dataset '{from_dataset.name}'." + ) + from_columns.append(field) + to_columns: list[DatasetField] = [] + for col in jp.to_columns: + field = to_dataset.field(col) + if field is None: + raise ValueError( + f"JoinPath '{jp.name}': column '{col}' not found in dataset '{to_dataset.name}'." + ) + to_columns.append(field) + return JoinPath( + name=jp.name, + from_dataset=from_dataset, + to_dataset=to_dataset, + from_columns=from_columns, + to_columns=to_columns, + ai_context=jp.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in jp.custom_extensions], + ) + + +def _convert_metric(m: SpecMetric) -> Metric: + return Metric( + name=m.name, + expression=_convert_expression(m.expression), + description=m.description, + ai_context=m.ai_context, + custom_extensions=[_convert_custom_extension(ce) for ce in m.custom_extensions], + ) \ No newline at end of file diff --git a/converters/ontology/src/osi/external/__init__.py b/converters/ontology/src/osi/external/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/external/palantir/__init__.py b/converters/ontology/src/osi/external/palantir/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/converters/ontology/src/osi/external/palantir/model.py b/converters/ontology/src/osi/external/palantir/model.py new file mode 100644 index 0000000..82a9b34 --- /dev/null +++ b/converters/ontology/src/osi/external/palantir/model.py @@ -0,0 +1,632 @@ +from __future__ import annotations + +from enum import Enum + + +class Status(Enum): + ACTIVE = 1 + DEPRECATED = 2 + EXPERIMENTAL = 3 + EXAMPLE = 4 + ENDORSED = 5 + INTERMEDIARY = 6 # This status had been introduced for testing purposes + +class DataType(Enum): + ANY = 1 + ATTACHMENT = 2 + BOOLEAN = 3 + DATE = 4 + DECIMAL = 5 + DOUBLE = 6 + FLOAT = 7 + GEOHASH = 8 + GEOPOINT = 9 + GEOSHAPE = 10 + INTEGER = 11 + LONG = 12 + SHORT = 13 + STRING = 14 + TIMESERIES = 15 + TIMESTAMP = 16 + TIME_DEPENDENT = 17 + STRUCT = 18 + VECTOR = 19 + MEDIA_REFERENCE = 20 + CIPHER_TEXT = 21 + + @staticmethod + def parse_datatype(name: str) -> DataType: + try: + return DataType[name.upper()] + except KeyError: + raise ValueError(f"Unrecognized data type: {name}") + + def to_type(self) -> str: + if self in (DataType.STRING, DataType.GEOHASH, DataType.GEOSHAPE, DataType.GEOPOINT, DataType.TIMESERIES): + return "String" + elif self == DataType.INTEGER: + return "Integer" + elif self == DataType.DECIMAL: + return "Decimal" + elif self in (DataType.FLOAT, DataType.DOUBLE, DataType.TIME_DEPENDENT): + return "Float" + elif self == DataType.BOOLEAN: + return "Boolean" + elif self == DataType.TIMESTAMP: + return "DateTime" + elif self == DataType.DATE: + return "Date" + else: + return "Integer" + + +class ArrayDataType: + + def __init__(self, t: DataType | ArrayDataType): + self._base_type = t + + def base_type(self): + return self._base_type + + def __str__(self): + return f"array[{str(self._base_type)}]" + +class Resource: + + def __init__(self, guid, rid): + self._guid = guid + self._readable_id = rid + self._status = Status.ACTIVE + + def active(self): + return self._status == Status.ACTIVE + + def experimental(self): + return self._status == Status.EXPERIMENTAL + + def endorsed(self): + return self._status == Status.ENDORSED + + def intermediary(self): + return self._status == Status.INTERMEDIARY + + def guid(self): + return self._guid + + def readable_id(self): + return self._readable_id + + def set_status(self, stat:Status): + self._status = stat + + def status(self): + return self._status + +class DataSetModel: + + _data_sets: dict[str, DataSet] + + def __init__(self): + self._data_sets = {} + + def data_sets(self): + return self._data_sets.values() + + def data_sets_map(self): + return self._data_sets + + def info(self) -> str: + result: list[str] = [] + for ds in self._data_sets.values(): + result.append(str(ds)) + return "\n".join(result) + +class DataSet(Resource): + + def __init__(self, guid, rid): + super().__init__(guid, rid) + self._description: str | None = None + self._path: str | None = None + self._columns: list[DataSetColumn] = [] + self._depends_on: list[DataSet] = [] + + def description(self): + return self._description + + def path(self): + return self._path + + def columns(self): + return self._columns + + def depends_on(self): + return self._depends_on + + def info(self, indent: int = 0, visited: set[str] | None = None) -> str: + """ + Pretty-print this dataset with indentation and handle dependency graph + (avoids infinite recursion on cycles by tracking visited mainDatasetIds). + """ + pad = " " * indent + if visited is None: + visited = set() + lines: list[str] = [] + ds_id = str(self._guid) if self._guid is not None else "None" + name = str(self._readable_id) if self._readable_id is not None else "None" + + header = f'{pad}DataSet(id="{ds_id}", name="{name}")' + lines.append(header) + if self._description: + lines.append(f"{pad} description: {self._description}") + if self._path: + lines.append(f"{pad} path: {self._path}") + + # Columns + if self._columns: + lines.append(f"{pad} columns:") + for col in self._columns: + lines.append(col.info(indent + 4)) + + # Dependencies + if self._depends_on: + lines.append(f"{pad} depends_on:") + if ds_id in visited: + lines.append(f"{pad} ") + else: + visited.add(ds_id) + for dep in self._depends_on: + # Safeguard when dep is None + if dep is None: + lines.append(f"{pad} ") + continue + lines.append(dep.info(indent + 4, visited)) + visited.remove(ds_id) + + return "\n".join(lines) + + def __str__(self) -> str: + return self.info() + +class DataSetColumn: + def __init__(self, name, t, ds: DataSet): + self._type = t + self._name = name + self._part_of = ds + + def name(self): + return self._name + + def type(self): + return self._type + + def part_of(self) -> DataSet: + return self._part_of + + def info(self, indent: int = 0) -> str: + return f'{" " * indent}Column(name="{self._name}", type="{self._type}", part_of="{self._part_of.readable_id()}")' + + def __str__(self) -> str: + return self.info() + +class Ontology: + + _data_sets: dict[str, DataSet] + _object_types: dict[str, ObjectType] + _object_types_by_readable_id: dict[str, ObjectType] + _relations: dict[str, Relation] + _relations_by_readable_id: dict[str, Relation] + _intermediary_relations: dict[str, IntermediaryRelation] + + def __init__(self): + self._data_sets = {} + self._object_types = {} + self._object_types_by_readable_id = {} + self._relations = {} + self._relations_by_readable_id = {} + self._intermediary_relations = {} + + def add_object_type(self, ot): + self._object_types[ot.guid()] = ot + self._object_types_by_readable_id[ot.readable_id()] = ot + return self + + def add_relation(self, rel): + self._relations[rel.guid()] = rel + self._relations_by_readable_id[rel.readable_id()] = rel + return self + + def object_types(self): + return self._object_types + + def object_type_by_readable_id(self, rid): + return self._object_types_by_readable_id[rid] + + def relations(self): + return self._relations + + def relation_by_readable_id(self, rid): + return self._relations_by_readable_id[rid] + + def intermediary_relations(self): + return self._intermediary_relations + + def data_sets(self): + return self._data_sets + + def set_data_sets(self, data_sets: dict[str, DataSet]): + self._data_sets = data_sets + + def info(self, indentation="") -> str: + result: list[str] = [] + + for ot in sorted(self._object_types.values(), key=lambda x: x.guid()): + result.append(ot.info()) + result.append("") + + for ds in sorted(self._data_sets.values(), key=lambda x: x.guid()): + ds_name = ds.readable_id() + result.append(indentation + f"Data set '{ds_name}':") + for col in sorted(ds.columns(), key=lambda x: x.name()): + result.append(indentation + f" Column '{col.name()}' of type '{col.type()}'") + result.append("") + + for rel in sorted(self._relations.values(), key=lambda x: x.guid()): + result.append(indentation + rel.info()) + result.append("") + + for ir in sorted(self._intermediary_relations.values(), key=lambda x: x.guid()): + result.append(indentation + ir.info()) + result.append("") + + return "\n".join(result) + + def subtypes_relations(self) -> dict[ObjectType, ManyToOneRelation]: + result: dict[ObjectType, ManyToOneRelation] = {} + for rel in self._relations.values(): + if not isinstance(rel, ManyToOneRelation): + continue + + rel_exp_eligible = ( + rel.experimental() + and rel.one_object_type().active() + and rel.many_object_type().active() + ) + if not (rel.active() or rel_exp_eligible): + continue + + one_ot = rel.one_object_type() + many_ot = rel.many_object_type() + + if not rel.property_map(): + continue + + is_subtype = all( + mprop in many_ot.primary_keys() and oprop in one_ot.primary_keys() + for mprop, oprop in rel.property_map().items() + ) + + if is_subtype: + result[one_ot] = rel + + return result + +# An ObjectType is Palantir's analog of an EntityType. Its instances are +# identified by its primary-key Properties, which appear in the JSON as +# ReadingIds in an array, e.g.: +# +# { +# "rid" : +# "primaryKeys" : [ +# +# ... +# ] +# } +# +class ObjectType(Resource): + + def __init__(self, guid, rid, name): + super().__init__(guid, rid) + self._name = name + self._type_groups = [] + self._syncs_from = [] + self._properties = {} + self._pk_properties = set() + self._data_sources = [] + + def lookup_property_by_reading(self, pname): + for prop in self._properties.values(): + if prop.readable_id() == pname: + return prop + return None + + + def name(self): + return self._name + + def type_groups(self): + return self._type_groups + + def syncs_from(self): + if not self._syncs_from: + raise RuntimeError(f"Mandatory constraint violation: ObjectType '{self.readable_id()}' must sync with some DataSet") + return self._syncs_from + + def sync_from_data_set(self, ds): + self._syncs_from.append(ds) + return self + + def properties(self): + return self._properties + + def primary_keys(self): + return self._pk_properties + + def data_sources(self) -> list[DataSource]: + if not self._data_sources: + raise RuntimeError(f"Mandatory constraint violation: ObjectType '{self.readable_id()}' must have some data source") + return self._data_sources + + def set_properties(self, properties): + self._properties = properties + + def set_primary_keys(self, pk_properties): + self._pk_properties = pk_properties + + def info(self, indent: int = 0) -> str: + keys = ", ".join([prop.readable_id() for prop in self._pk_properties]) + result: list[str] = [f'{" " * indent}Object type "{self._name}({keys})":'] + if self._syncs_from: + for ds in self._syncs_from: + result.append(f'{" " * (indent + 4)}Syncs from "{ds.readable_id()}"') + if self._properties: + for p in self._properties.values(): + result.append(p.info(indent + 4)) + if self._type_groups: + result.append(f'{" " * (indent + 4)}Belongs to type groups:') + for tg in self._type_groups: + result.append(f'{" " * (indent + 8)} "{tg}"') + return "\n".join(result) + + def __str__(self) -> str: + return self.info() + +class DataSource: + def __init__(self, backing_dataset_id, backing_datasource_id): + self._backing_dataset_id = backing_dataset_id + self._backing_datasource_id = backing_datasource_id + + def backing_dataset_id(self): + return self._backing_dataset_id + + def backing_datasource_id(self): + return self._backing_datasource_id + +class Property(Resource): + + def __init__(self, guid, rid, t, ot: ObjectType, column_name, datasource_resource_id): + super().__init__(guid, rid) + self._part_of = ot + self._type = t + self._column_name = column_name + self._datasource_resource_id = datasource_resource_id + self._pk_mapping = {} + + def part_of(self) -> ObjectType: + return self._part_of + + def type(self): + return self._type + + def column_name(self): + return self._column_name + + def datasource_resource_id(self): + return self._datasource_resource_id + + def pk_mapping(self): + return self._pk_mapping + + def info(self, indent: int = 0) -> str: + result = f'{" " * indent}Property "{self.readable_id()}" has data type "{str(self._type)}"' + if self._column_name and not self._pk_mapping: + result += f'\n{" " * (indent + 4)}Refers to "{str(self._column_name)}" of "{self._datasource_resource_id}" dataset' + if self._pk_mapping: + for k,v in self._pk_mapping.items(): + result += f'\n{" " * (indent + 4)}Maps to primary key column "{v}" of "{k}" dataset' + return result + + def __str__(self) -> str: + return self.info() + +# In Palantir, a Relation is a binary relation whose roles are played +# by ObjectTypes rather than DataTypes -- i.e., entity types rather +# than value types. They come in two forms: ManyToOne and ManyToMany. +# +# Palantir does not model roles or constraints directly. Instead, Relations +# represent roles using Properties of the ObjectTypes that play the role. +# +class Relation(Resource): + + def __init__(self, guid, rid): + super().__init__(guid, rid) + + def info(self) -> str: + return "" + +# A ManyToOneRelation is a binary relation with a uniqueness constraint that spans +# the "many" role. These objects are populated from a JSON message that looks like this: +# +# { +# "definition": { +# "type" : "oneToMany", +# "oneToMany" : { +# "objectTypeRidOneSide" : , +# "objectTypeIdOneSide" : , +# ... +# }, +# "objectTypeIdManySide" : , +# "objectTypeRidManySide" : , +# "oneSidePrimaryKeyToManySidePropertyMapping" : { +# : , // one-object-property -> many-object-fk-property +# ... +# : , // one-object-property -> many-object-fk-property +# }, +# "rid" : +# } +# } +# +# Consider the conceptual relationship "Subscription is part of Account" with a UC on +# the Subscription role. Subscription then plays the "many" role, and Account plays the +# "one" role. The resource ids for the role players can be found using these paths: +# +# - for Subscription, and +# - for Account +# +# Such relations are implemented using one or more Properties of the ObjectType that +# plays the many role. Each of these Properties is interpreted as a foreign-key +# reference to a Property of the ObjectType that plays the one role. Because an ObjectType +# might have a compound key, there will be as many properties in the ObjectType that +# plays the many role as there are key properties in the ObjectType that plays the one +# role. And while we might naturally think about representing the correspondence between +# FK properties of the "many" object type to properties of the "one" object type, for +# some reason Palantir represents this in the reverse direction, which is equivalent, +# just weird. This is captured in the "oneSidePrimaryKeyToManySidePropertyMapping" +# message. +# +class ManyToOneRelation(Relation): + + def __init__(self, guid, rid, many_object_type: ObjectType, one_object_type: ObjectType, + property_map: dict[Property, Property]): + super().__init__(guid, rid) + self._one_object_type = one_object_type + self._many_object_type = many_object_type + # We map the property from the many object type to the property of the one object type + self._property_map = property_map + + def info(self) -> str: + one_role = self._one_object_type + many_role = self._many_object_type + return f'Relation "{self.readable_id()}" maps "{many_role._name}" to "{one_role._name}"' + + def many_object_type(self): + return self._many_object_type + + def one_object_type(self): + return self._one_object_type + + def property_map(self): + return self._property_map + +# A ManyToManyRelation is a binary relation with a uniqueness constraint that spans +# both of its roles. These are populated from a JSON message that looks like this: +# +# { +# "type": "manyToMany", +# "objectTypeRidA" : , +# "objectTypeRidB" : , +# "objectTypeIdA" : , +# "objectTypeIdB" : , +# "objectTypeAPrimaryKeyPropertyMapping" : { +# : , // a-object-property -> join-table-property +# ... +# }, +# "objectTypeBPrimaryKeyPropertyMapping" : { +# : , // b-object-property -> join-table-property +# ... +# }, +# "joinTableDataSource": { +# "backingResourceRid" : , +# "datasourceRid" : +# } +# } +# +class ManyToManyRelation(Relation): + def __init__(self, guid, rid, role_a_object_type: ObjectType, role_b_object_type: ObjectType, + role_a_property_map: dict[Property, str], role_b_property_map: dict[Property, str]): + super().__init__(guid, rid) + self._role_a_object_type = role_a_object_type + self._role_b_object_type = role_b_object_type + # + # We map properties from each of the two roles' object types to the id of a property + # of the backing resource. Notice that we map to rather than Property + # because the JSON form we are using may not record property information about the + # backing resource. + # + self._role_a_property_map = role_a_property_map + self._role_b_property_map = role_b_property_map + # + self._backing_dataset_id = None + self._backing_datasource_id = None + # + self._data_set: DataSet | None = None + + def role_a_player(self): + return self._role_a_object_type + + def role_b_player(self): + return self._role_b_object_type + + def role_a_property_map(self): + return self._role_a_property_map + + def role_b_property_map(self): + return self._role_b_property_map + + def backing_dataset_id(self): + if self._backing_dataset_id is None: + raise RuntimeError(f"Mandatory constraint violation: ManyToManyRelation '{self.readable_id()}' must name a backing dataset resource-id") + return self._backing_dataset_id + + def backing_datasource_id(self): + if self._backing_datasource_id is None: + raise RuntimeError(f"Mandatory constraint violation: ManyToManyRelation '{self.readable_id()}' must name a backing datasource resource-id") + return self._backing_datasource_id + + def data_set(self): + if self._data_set is None: + raise RuntimeError(f"Mandatory constraint violation: ManyToManyRelation '{self.readable_id()}' must have some DataSet") + return self._data_set + + def set_backing_dataset_id(self, id): + self._backing_dataset_id = id + + def set_backing_datasource_id(self, id): + self._backing_datasource_id = id + + def info(self): + result = [f'Relation "{self.readable_id()}" associates "{self.role_a_player()._name}" with "{self.role_b_player()._name}"'] + if self._data_set: + result.append(f' DataSet "{self.data_set().readable_id()}"') + return "\n".join(result) + + +class IntermediaryRelation(Relation): + def __init__(self, guid, rid, role_a_object_type: ObjectType, role_b_object_type: ObjectType, + intermediary_object_type: ObjectType, relation_a_rid: str, relation_b_rid: str): + super().__init__(guid, rid) + self._role_a_object_type = role_a_object_type + self._role_b_object_type = role_b_object_type + self._intermediary_object_type = intermediary_object_type + self._relation_a = relation_a_rid + self._relation_b = relation_b_rid + + def role_a_player(self): + return self._role_a_object_type + + def role_b_player(self): + return self._role_b_object_type + + def intermediary_player(self): + return self._intermediary_object_type + + def relation_a(self): + return self._relation_a + + def relation_b(self): + return self._relation_b + + def info(self): + return (f'Relation "{self.readable_id()}" associates "{self.role_a_player()._name}" with ' + f'"{self.role_b_player()._name} via intermediary player "{self.intermediary_player()._name}" and ' + f'relations "{self.relation_a()}" and "{self.relation_b()}"') + diff --git a/converters/ontology/src/osi/external/palantir/parser/__init__.py b/converters/ontology/src/osi/external/palantir/parser/__init__.py new file mode 100644 index 0000000..30b31d5 --- /dev/null +++ b/converters/ontology/src/osi/external/palantir/parser/__init__.py @@ -0,0 +1,680 @@ +import io +import json +import warnings +import zipfile +from io import IOBase +from typing import Any + +from osi.common.utils import camel_to_snake +from osi.external.palantir.model import DataSet, DataSetColumn, DataSetModel, ObjectType, Ontology, DataType, \ + ArrayDataType, Property, Status, ManyToOneRelation, Relation, ManyToManyRelation, IntermediaryRelation, DataSource +from osi.common.file_utils import iter_json_files_from_dir_in_zip, open_top_level_file_from_zip + + +# Helper functions to aid in parsing. Palantir's JSON exports can be inconsistent in their formatting, especially +# across versions. For example, some fields that are expected to be strings may sometimes be empty strings or +# missing entirely, and some fields that are expected to be lists may sometimes be singletons or missing entirely. +# These helper functions normalize these inconsistencies to make parsing easier. + +def norm(v: Any) -> str | None: + if not isinstance(v, str): + return None + return v if v.strip() else None + +def set_if_value(curr: str | None, new_val: str | None) -> str | None: + # Only set when new_val is not None (i.e., not empty string or missing) + return new_val if new_val is not None else curr + +def get_dict(d, key): + v = d.get(key) + return v if isinstance(v, dict) else {} + +def get_list(d, key): + v = d.get(key) + return v if isinstance(v, list) else [] + +# DataSets in Palantir have their own JSON format that is separate from the Ontology JSON format. +class PalantirDataSetParser: + + _model: DataSetModel + + def __init__(self): + self._model = DataSetModel() + + def model(self): + return self._model + + def _dataset_from_dict(self, d: dict[str, Any], registry: dict[str, DataSet]) -> DataSet | None: + ds_id = norm(d.get("mainDatasetId")) + + if not ds_id: + return None + + # Reuse or create instance; do not return early to ensure children are populated + ds = registry.get(ds_id, None) + if ds is None: + ds = DataSet(ds_id, norm(d.get("datasetName"))) + registry[ds_id] = ds + + # Scalars without overriding with empty values + ds._path = set_if_value(ds.path(), norm(d.get("datasetPath"))) + ds._readable_id = set_if_value(ds.readable_id(), norm(d.get("datasetName"))) + ds._description = set_if_value(ds.description(), norm(d.get("description"))) + + # Columns + ds_schema = d.get("datasetSchema") + if isinstance(ds_schema, list): + cols: list[DataSetColumn] = [] + for item in ds_schema: + if isinstance(item, dict): + cols.append(DataSetColumn(item.get("name"), item.get("type"), ds)) + ds._columns = cols # only when provided as a proper list + + # Dependencies + raw_inputs = d.get("inputDatasetIds") + if isinstance(raw_inputs, list): + inputs: list[DataSet] = [] + for item in raw_inputs: + if not isinstance(item, dict): + continue + child = self._dataset_from_dict(item, registry) + if child is not None: + inputs.append(child) + ds._depends_on = inputs + + return ds + + def parse(self, file: IOBase): + data = json.load(file) + + if not isinstance(data, list): + raise ValueError("Top-level JSON must be an array of datasets") + + registry: dict[str, DataSet] = {} + for item in data: + if isinstance(item, dict): + ds = self._dataset_from_dict(item, registry) + if ds: + self.model().data_sets_map()[ds.guid()] = ds + +# +# The constructs declared within a Palantir ontology refer to one another using one +# or both of two different reference schemes: +# - Resource ids, which are essentially GUIDs, and +# - Readable ids, which are human-readable strings that are not guaranteed to be unique +# but are more stable across versions and easier to work with. +# The parser extracts both forms of identifiers for each construct and builds lookup maps +# keyed by both forms of identifier to make it easier to resolve references regardless of +# which form they use. In general, resource ids correspond to the 'rid' JSON key, while +# readable ids correspond to the 'id' or 'apiName' JSON keys. +# +class PalantirOntologyParser: + _model: Ontology + + def __init__(self): + self._model = Ontology() + + def model(self): + return self._model + + def parse(self, file: IOBase): + data = json.load(file) + + if not isinstance(data, dict): + raise ValueError("Top-level JSON must be a dictionary of Ontology data") + + # Object Types + object_types, object_types_by_readable_id = self._parse_object_types(data) + self._model._object_types = object_types + self._model._object_types_by_readable_id = object_types_by_readable_id + + # ManyToOneRelations + self._model._relations, self._model._intermediary_relations = self._parse_relations(data, object_types) + self.validate_intermediary_relations() + + self._parse_extra(data) + + # Given a Raw Palantir ObjectType, extract the string to use as its name regardless + # of JSON convention + def _parse_object_type_name(self, raw_ot): + # Newer JSONs contain a displayMetadata section with this information + display_metadata = get_dict(raw_ot, "displayMetadata") + if display_metadata: + ot_name = norm(display_metadata.get("displayName")) + else: + ot_name = norm(raw_ot.get("displayName")) + + if ot_name is None: + raise ValueError(f'Could not extract a name from ObjectType with rid: {raw_ot.get("rid")}') + return ot_name + + def _parse_property_backing_data(self, raw_prop, property_id, object_type_id): + # In the new exports this information stores in the `source` field, but in the old exports + # it leaves in the `column` and `datasourceRid` fields. We need to support both cases. + # Really old format doesn't even have column/datasource info - so use property name as column + source = get_dict(raw_prop, "source") + column_name = norm(source.get("columnName")) or norm(raw_prop.get("column")) or property_id + backing_datasource_id = norm(source.get("datasourceBackingResourceRid")) or norm(raw_prop.get("datasourceRid")) or object_type_id + + return (column_name, backing_datasource_id) + + def _parse_object_types(self, data: dict) -> tuple[dict[str, ObjectType], dict[str, ObjectType]]: + object_types = {} + object_types_by_readable_id = {} + for raw_ot in get_list(data, "objectTypes"): + guid = norm(raw_ot.get("rid")) + if not guid: + raise ValueError("Object type `rid` field must be non-empty") + # Support both formats: new (id) and old (apiName) + readable_id = norm(raw_ot.get("id")) or norm(raw_ot.get("apiName")) + + # Extract the ObjectType's name + ot_name = self._parse_object_type_name(raw_ot) + + object_type = ObjectType(guid, readable_id, ot_name) + + object_type._type_groups = get_list(raw_ot, "typeGroups") + + status_message = get_dict(raw_ot, "status") + if status_message: + object_type.set_status(self._get_status(norm(status_message.get("type")))) + + object_types[guid] = object_type + object_types_by_readable_id[readable_id] = object_type + + data_sources = get_list(raw_ot, "datasources") + if len(data_sources) < 1: + # No backing datasource? This is common in old versions of the JSON. + # Then create one that uses the same identifier as the ObjectType it backs. + object_type._data_sources.append(DataSource(readable_id, readable_id)) + else: + for data_source in data_sources: + datasource_rid = norm(data_source.get("datasourceRid")) + backing_resource_rid = norm(data_source.get("backingResourceRid")) + + if not datasource_rid or not backing_resource_rid: + raise ValueError("Object type fields `datasourceRid` and `backingResourceRid` must be non-empty") + + object_type._data_sources.append(DataSource(backing_resource_rid, datasource_rid)) + + + properties = {} + properties_by_readable_id = {} + + # Support both formats: list (new) and dict (old) + raw_properties = raw_ot.get("properties", []) + if isinstance(raw_properties, dict): + # Old format: properties is a dict keyed by property name + raw_properties = list(raw_properties.values()) + + for raw_prop in raw_properties: + # Parse type (supports nested arrays) + # Old format uses 'dataType', new uses 'baseType' + raw_base_type = get_dict(raw_prop, "baseType") or get_dict(raw_prop, "dataType") + prop_type = self._parse_datatype_node(raw_base_type) + + # Support both formats: new (id) and old (apiName) + prop_id = norm(raw_prop.get("id")) or norm(raw_prop.get("apiName")) + prop_guid = norm(raw_prop.get("rid")) + if not prop_guid or not prop_id: + warnings.warn(f"Skipping property with missing id/rid in object type '{ot_name}'") + continue + + (column_name, backing_datasource_id) = self._parse_property_backing_data(raw_prop, prop_id, guid) + + prop_name = prop_id + prop = Property(prop_guid, prop_name, prop_type, object_type, column_name, backing_datasource_id) + + status_message = get_dict(raw_prop, "status") + if status_message: + prop.set_status(self._get_status(norm(status_message.get("type")))) + + # This information exists only in the latest exports + primary_key_mapping = get_dict(raw_prop, "primaryKeyMapping") + if primary_key_mapping: + pk_mapping = {} + for k,v in primary_key_mapping.items(): + pk_column_name = norm(v.get("columnName")) + pk_mapping[k] = pk_column_name + prop._pk_mapping = pk_mapping + + properties[prop_guid] = prop + properties_by_readable_id[prop_name] = prop + + object_type._properties = properties + + pk_properties = set() + # Support both formats: primaryKeys (list) and primaryKey (string) + pk_list = get_list(raw_ot, "primaryKeys") + if not pk_list: + single_pk = norm(raw_ot.get("primaryKey")) + if single_pk: + pk_list = [single_pk] + for raw_pk_prop in pk_list: + pk_property = properties_by_readable_id.get(raw_pk_prop, None) + if pk_property is None: + warnings.warn(f"Property '{raw_pk_prop}' is not defined in object type '{ot_name}' - skipping as primary key") + continue + pk_properties.add(pk_property) + + object_type._pk_properties = pk_properties + + return object_types, object_types_by_readable_id + + + def _parse_raw_relation_id(self, raw_relation): + return norm(raw_relation.get("id")) or norm(raw_relation.get("apiName")) + + def _parse_raw_relation_guid(self, raw_relation): + return norm(raw_relation.get("rid")) or norm(raw_relation.get("linkTypeRid")) + + # Assumes raw_relation is a "MANY" relation and looks to make sure that it is + # an alternative reading of a "ONE" relation + def _verify_alternative_reading_of(self, raw_relation, all_relations): + id = raw_relation.get("linkTypeRid") + for r in all_relations: + if r.get("cardinality") == 'ONE': + if r.get("linkTypeRid") == id: + return True + return False + + def _parse_source_and_target(self, raw_relation, object_types): + # In the old format, sourceObjectType/targetObjectType name the source and target + # using readings rather than guids. + source_ot = norm(raw_relation.get("sourceObjectType")) + target_ot = norm(raw_relation.get("targetObjectType")) + + # Look up object types by apiName (readable_id) + source_object_type = None + target_object_type = None + for ot in object_types.values(): + if ot.readable_id() == source_ot: + source_object_type = ot + if ot.readable_id() == target_ot: + target_object_type = ot + return (source_object_type, target_object_type) + + # The old style JSON format supports only ManyToOne relations and uses a simpler format. + def _parse_old_style_relation(self, raw_relation, object_types): + id = self._parse_raw_relation_id(raw_relation) + (source_object_type, target_object_type) = self._parse_source_and_target(raw_relation, object_types) + + if not source_object_type or not target_object_type: + warnings.warn(f"Skipping relation {self._parse_raw_relation_id(raw_relation)}: source or target object type not found") + return None + + # target_object_type must comprise exactly one primary-key property + if len(target_object_type.primary_keys()) == 1: + + # Choose the lone property from the set of target_object_type's primary key properties + target_object_pk_property = next(iter(target_object_type.primary_keys())) + + # Look up the name of the source property that is a foreign key reference + # to target_object_type's primary key property + source_property_name = norm(raw_relation.get("foreignKeyPropertyApiName")) + if source_property_name is not None: + fk_property = source_object_type.lookup_property_by_reading(source_property_name) + if fk_property is not None: + # Build property mapping from foreign key + property_map = { fk_property: target_object_pk_property } + guid = self._parse_raw_relation_guid(raw_relation) + return ManyToOneRelation(guid, id, source_object_type, target_object_type, property_map) + + warnings.warn(f"Skipping relation {id}: no foreign key mapping available.") + return None + + def _parse_relations(self, data: dict, object_types: dict[str, ObjectType]) -> tuple[dict[str, Relation], dict[str, IntermediaryRelation]]: + relations = {} + intermediary_relations = {} + + all_relations = get_list(data, "relations") + for raw_relation in all_relations: + # Support both formats: new (id/rid) and old (apiName/linkTypeRid) + relation_id = self._parse_raw_relation_id(raw_relation) + relation_guid = self._parse_raw_relation_guid(raw_relation) + + if not relation_guid or not relation_id: + # Skip relations without proper identifiers (can happen with SDK-extracted ontologies) + warnings.warn(f"Skipping relation with missing id/rid: {raw_relation.get('apiName', 'unknown')}") + continue + + relation_type = None + definition = get_dict(raw_relation, "definition") + if definition: + relation_type = norm(definition.get("type")) + + relation: Relation | None = None + + if not definition: + cardinality = norm(raw_relation.get("cardinality")) + if cardinality == 'MANY': + if not self._verify_alternative_reading_of(raw_relation, all_relations): + warnings.warn(f'Encountered an unsupported ManyToMany relation {relation_id}') + continue + + # Otherwise, assume the cardinality is "ONE" + relation = self._parse_old_style_relation(raw_relation, object_types) + if relation is None: + continue + + elif relation_type and relation_type.lower() == "onetomany": + one_to_many_dict = get_dict(definition, "oneToMany") + relation = self._parse_many_to_one_relation(relation_guid, relation_id, one_to_many_dict, object_types) + elif relation_type and relation_type.lower() == "intermediary": + intermediary_dict = get_dict(definition, "intermediary") + relation = self._parse_intermediary_relation(relation_guid, relation_id, intermediary_dict, object_types) + else: + many_to_many_dict = get_dict(definition, "manyToMany") + relation = self._parse_many_to_many_relation(relation_guid, relation_id, many_to_many_dict, object_types) + + status_message = get_dict(raw_relation, "status") + if status_message: + relation.set_status(self._get_status(norm(status_message.get("type")))) + + if isinstance(relation, IntermediaryRelation): + intermediary_relations[relation_guid] = relation + else: + relations[relation_guid] = relation + + return relations, intermediary_relations + + def validate_intermediary_relations(self): + for r in self._model.intermediary_relations().values(): + # Validate that the intermediary relation's link types exist + if r.relation_a() not in self._model.relations().keys(): + raise ValueError( + f"Relation with rid {r.relation_a()} is not defined for intermediary relation {r.guid()}") + if r.relation_b() not in self._model.relations().keys(): + raise ValueError( + f"Relation with rid {r.relation_b()} is not defined for intermediary relation {r.guid()}") + + + def _parse_many_to_one_relation(self, guid: str, id: str, raw: dict[Any, Any], object_types: dict[str, ObjectType]) -> Relation: + + one_object_type_rid = norm(raw.get("objectTypeRidOneSide")) + many_object_type_rid = norm(raw.get("objectTypeRidManySide")) + if not one_object_type_rid or not many_object_type_rid: + raise ValueError("ManyToOne relation is missing objectTypeRid fields") + + try: + one_object_type = object_types[one_object_type_rid] + many_object_type = object_types[many_object_type_rid] + except KeyError as e: + raise ValueError(f"Object type {e.args[0]} is not defined") from None + + one_to_many_mapping = get_dict(raw, "oneSidePrimaryKeyToManySidePropertyMapping") + if not one_to_many_mapping: + raise ValueError("Relation definition must contain `oneSidePrimaryKeyToManySidePropertyMapping`") + + property_map: dict[Property, Property] = {} + for k, v in one_to_many_mapping.items(): + try: + one_property = one_object_type.properties()[k] + many_property = many_object_type.properties()[v] + except KeyError as e: + raise ValueError(f"Property {e.args[0]} is not defined in object type {e.args[1]}") from None + + property_map[many_property] = one_property + + return ManyToOneRelation(guid, id, many_object_type, one_object_type, property_map) + + def _parse_many_to_many_relation(self, guid: str, id: str, raw: dict[Any, Any], object_types: dict[str, ObjectType]) -> Relation: + + role_a_object_type_rid = norm(raw.get("objectTypeRidA")) + role_b_object_type_rid = norm(raw.get("objectTypeRidB")) + if not role_a_object_type_rid or not role_b_object_type_rid: + raise ValueError("ManyToMany relation is missing objectTypeRid fields") + + try: + role_a_object_type = object_types[role_a_object_type_rid] + role_b_object_type = object_types[role_b_object_type_rid] + except KeyError as e: + raise ValueError(f"Object type {e.args[0]} is not defined") from None + + def build_property_map(object_type, pk_mapping: dict[str, str]) -> dict[Property, str]: + prop_map: dict[Property, str] = {} + for src_prop_id, dst_prop_id in pk_mapping.items(): + try: + obj_prop = object_type.properties()[src_prop_id] + except KeyError as e: + raise ValueError(f"Property {e.args[0]} is not defined in object type {e.args[1]}") from None + prop_map[obj_prop] = dst_prop_id + return prop_map + + role_a_pk_mapping = get_dict(raw, "objectTypeAPrimaryKeyPropertyMapping") + if not role_a_pk_mapping: + raise ValueError("Relation definition must contain `objectTypeAPrimaryKeyPropertyMapping`") + + role_a_property_map: dict[Property, str] = build_property_map(role_a_object_type, role_a_pk_mapping) + + role_b_pk_mapping = get_dict(raw, "objectTypeBPrimaryKeyPropertyMapping") + if not role_b_pk_mapping: + raise ValueError("Relation definition must contain `objectTypeBPrimaryKeyPropertyMapping`") + + role_b_property_map: dict[Property, str] = build_property_map(role_b_object_type, role_b_pk_mapping) + + relation = ManyToManyRelation(guid, id, role_a_object_type, role_b_object_type, role_a_property_map, + role_b_property_map) + + join_table_data_source = get_list(raw, "joinTableDatasource") + if len(join_table_data_source) != 1: + raise ValueError("Relation definition must contain exactly one `joinTableDatasource`") + + datasource_rid = norm(join_table_data_source[0].get("datasourceRid")) + backing_resource_rid = norm(join_table_data_source[0].get("backingResourceRid")) + + if not datasource_rid or not backing_resource_rid: + raise ValueError("Relation fields `datasourceRid` and `backingResourceRid` must be non-empty") + + relation.set_backing_datasource_id(datasource_rid) + relation.set_backing_dataset_id(backing_resource_rid) + + return relation + + def _parse_intermediary_relation(self, guid: str, id: str, raw: dict[Any, Any], object_types: dict[str, ObjectType]) -> Relation: + role_a_object_type_rid = norm(raw.get("objectTypeRidA")) + role_b_object_type_rid = norm(raw.get("objectTypeRidB")) + intermediary_rid = norm(raw.get("intermediaryObjectTypeRid")) + if not role_a_object_type_rid or not role_b_object_type_rid or not intermediary_rid: + raise ValueError("Intermediary relation is missing objectTypeRid fields") + + try: + role_a_object_type = object_types[role_a_object_type_rid] + role_b_object_type = object_types[role_b_object_type_rid] + intermediary_object_type = object_types[intermediary_rid] + except KeyError as e: + raise ValueError(f"Object type {e.args[0]} is not defined") from None + + a_to_intermediary_link_rid = norm(raw.get("aToIntermediaryLinkTypeRid")) + intermediary_to_b_link_rid = norm(raw.get("intermediaryToBLinkTypeRid")) + if not a_to_intermediary_link_rid or not intermediary_to_b_link_rid: + raise ValueError("Intermediary relation is missing link type rid fields") + + return IntermediaryRelation(guid, id, role_a_object_type, role_b_object_type, intermediary_object_type, + a_to_intermediary_link_rid, intermediary_to_b_link_rid) + + def _get_status(self, status): + match status: + case "active": + return Status.ACTIVE + case "deprecated": + return Status.DEPRECATED + case "experimental": + return Status.EXPERIMENTAL + case "example": + return Status.EXAMPLE + case "endorsed": + return Status.ENDORSED + # This status had been introduced for testing purposes + case "intermediary": + return Status.INTERMEDIARY + case _: + raise ValueError(f"Unrecognized Resource status {status}") + + def _parse_datatype_node(self, node) -> DataType | ArrayDataType: + """ + node: {"type": "...", "subType": {...}} possibly nested arrays + Returns DataType or ArrayDataType wrapping. + """ + + t = norm(node.get("type")) + if not t: + # Default to STRING for missing types + return DataType.STRING + + if t.upper() == "ARRAY": + # Support both camelCase (subType) and snake_case (sub_type) + sub = get_dict(node, "subType") or get_dict(node, "sub_type") + if not sub: + # Default to STRING array if subType is missing + return ArrayDataType(DataType.STRING) + inner = self._parse_datatype_node(sub) + return ArrayDataType(inner) + + # Non-array primitive + return DataType.parse_datatype(t) + + + def _parse_extra(self, data: dict) -> None: + """Extension point for subclasses to parse additional data from the ontology JSON. + + Called at the end of :meth:`parse` with the fully-deserialized JSON dict. + The base implementation is a no-op; override in a subclass to add domain-specific + parsing (e.g. actions, workflows, validations) without re-reading the file. + """ + +class PalantirParser: + _model: Ontology + + def model(self): + if self._model is None: + raise RuntimeError("You must call 'parse()' first before calling 'model()'") + return self._model + + def _make_ontology_parser(self) -> PalantirOntologyParser: + return PalantirOntologyParser() + + def parse(self, file: IOBase): + raw = file.buffer if isinstance(file, io.TextIOWrapper) else file + + # Read all bytes and detect ZIP + data = raw.read() + bio = io.BytesIO(data) + if not zipfile.is_zipfile(bio): + raise ValueError("Unsupported archive format. Expected ZIP") + + bio.seek(0) + with zipfile.ZipFile(bio) as zf: + self._parse_from_zip(zf) + + def _parse_from_zip(self, zf: zipfile.ZipFile): + self._validate_archive(zf) + + any_json = False + data_sets: dict[str, DataSet] = {} + for name, fh in iter_json_files_from_dir_in_zip(zf, "data_sets"): + any_json = True + try: + parser = PalantirDataSetParser() + parser.parse(fh) + data_sets.update(parser.model().data_sets_map()) + finally: + fh.close() + if not any_json: + raise ValueError("'data_sets' folder contains no JSON files") + + try: + with open_top_level_file_from_zip(zf, self._get_ontology_json_file_path(zf)) as fh: + parser = self._make_ontology_parser() + parser.parse(fh) + model = parser.model() + model.set_data_sets(data_sets) + + for ot in model.object_types().values(): + for ds in ot.data_sources(): + data_set = data_sets.get(ds.backing_dataset_id(), None) + if data_set is None: + # For SDK-extracted ontologies with synthetic datasources, + # mainDatasetId in data_sets JSON matches the object type's RID + data_set = data_sets.get(ot.guid(), None) + if data_set: + ot.sync_from_data_set(data_set) + # For SDK-extracted ontologies, property column_name defaults to + # the apiName (camelCase), but dataset columns use snake_case. + # Cross-reference to use the actual dataset column names. + ds_col_names = {col.name() for col in data_set.columns()} + for prop in ot.properties().values(): + col_name = prop.column_name() + if col_name not in ds_col_names: + snake_name = camel_to_snake(col_name) + if snake_name in ds_col_names: + prop._column_name = snake_name + for rel in model.relations().values(): + if isinstance(rel, ManyToManyRelation): + rel._data_set = data_sets.get(rel.backing_dataset_id(), None) + + self._model = model + except FileNotFoundError as e: + raise FileNotFoundError(str(e)) from e + + def _validate_archive(self, zf: zipfile.ZipFile): + """ + Ensure the ZIP archive contains a required 'data_sets/' directory. + Accept either: + - Top-level 'data_sets/' folder, or + - A single-root folder with 'root/data_sets/' inside. + """ + names = zf.namelist() + + # Fast path: direct presence at top-level or files under 'data_sets/' + has_data_sets = any( + n.endswith("/") and n.rstrip("/").endswith("data_sets") for n in names + ) or any(n.startswith("data_sets/") for n in names) + if has_data_sets: + return + + # Single-root archives: if there's exactly one root folder, allow root/data_sets/ + roots = {n.split("/", 1)[0] for n in names if "/" in n} + if len(roots) == 1: + root = next(iter(roots)) + has_rooted_data_sets = any( + n.endswith("/") and n.rstrip("/").endswith(f"{root}/data_sets") for n in names + ) or any(n.startswith(f"{root}/data_sets/") for n in names) + if has_rooted_data_sets: + return + + raise ValueError("Archive does not contain required 'data_sets' folder") + + def _get_ontology_json_file_path(self, zf: zipfile.ZipFile) -> str: + """ + Find exactly one top-level JSON file and return its archive path. + + Rules: + - "Top-level" means entries without '/' in their name. + - If the archive is packaged under a single root directory, then + "top-level" means entries directly under that root (exactly one '/'). + - There must be exactly one JSON at this level; otherwise raise. + """ + names = zf.namelist() + + # Identify entries without any parent directory. + top_level = [n for n in names if "/" not in n] + + # If nothing is at the real top-level, accept the case of a single root folder. + if not top_level: + roots = {n.split("/", 1)[0] for n in names if "/" in n} + if len(roots) == 1: + root = next(iter(roots)) + # Entries directly under the single root (e.g., 'root/file.json') + top_level = [n for n in names if n.startswith(f"{root}/") and n.count("/") == 1] + + # Keep only JSON files + json_candidates = [n for n in top_level if n.lower().endswith(".json")] + + # Enforce exactly one ontology JSON at the top level + if len(json_candidates) == 0: + raise ValueError("Archive must contain exactly one top-level JSON file (none found)") + if len(json_candidates) > 1: + raise ValueError("Archive must contain exactly one top-level JSON file (multiple found)") + + return json_candidates[0] diff --git a/converters/ontology/src/osi/model.py b/converters/ontology/src/osi/model.py new file mode 100644 index 0000000..5b35492 --- /dev/null +++ b/converters/ontology/src/osi/model.py @@ -0,0 +1,1128 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Protocol + +# --------------------------------------------------------------------------- +# Builtin concept names. +# --------------------------------------------------------------------------- + +BUILTIN_CONCEPTS: frozenset[str] = frozenset({ + "Any", "AnyEntity", "Boolean", "Date", "DateTime", "Decimal", "Float", "Integer", "String" +}) + +# --------------------------------------------------------------------------- +# Free-form metadata mirroring spec +# --------------------------------------------------------------------------- + +AiContext = str | dict[str, Any] + + +@dataclass +class CustomExtension: + vendor_name: str + data: str + + +# --------------------------------------------------------------------------- +# Ontology (concepts + relationships grouped by container) +# --------------------------------------------------------------------------- + +class ConceptType(str, Enum): + ENTITY_TYPE = "EntityType" + VALUE_TYPE = "ValueType" + + @classmethod + def from_value(cls, value: str | None) -> ConceptType | None: + if value is None: + return None + if not isinstance(value, str): + raise TypeError("value must be a string") + for member in cls: + if member.value == value: + return member + raise ValueError(f"Unknown concept type: {value}") + + +class RelationshipMultiplicity(str, Enum): + """Spec-level multiplicity declared on a relationship. + + Allows OneToOne or ManyToOne (ManyToMany is no longer expressible + at the spec level — it becomes the default 'unconstrained' case). + """ + ONE_TO_ONE = "OneToOne" + MANY_TO_ONE = "ManyToOne" + + @classmethod + def from_value(cls, value: str | None) -> RelationshipMultiplicity | None: + if value is None: + return None + if not isinstance(value, str): + raise TypeError("value must be a string") + normalized = value.strip().lower() + for member in cls: + if member.value.lower() == normalized: + return member + raise ValueError(f"Unknown relationship multiplicity value: {value}") + + +class Concept: + """Type-like ontology node. May be an EntityType (real-world object, + referenced via identifying relationships) or a ValueType (primitive-ish, + transitively extending a built-in value type).""" + _name: str + _type: ConceptType | None + _description: str | None + _builtin: bool + _extends: list[Concept] + _identify_by: dict[str, Relationship] + _derived_by: list[Formula] + _requires: list[Formula] + + def __init__( + self, + name: str, + type: ConceptType | None = None, + description: str | None = None, + builtin: bool = False, + extends: list[Concept] | None = None, + identify_by: dict[str, Relationship] | None = None, + derived_by: list[Formula] | None = None, + requires: list[Formula] | None = None, + ): + self._name = name + self._type = type + self._description = description + self._builtin = builtin + self._extends = extends if extends else [] + self._identify_by = identify_by if identify_by else {} + self._derived_by = derived_by if derived_by else [] + self._requires = requires if requires else [] + + def add_require(self, require: Formula) -> None: + self._requires.append(require) + + def add_derived_by(self, rule: Formula) -> None: + self._derived_by.append(rule) + + def set_identify_by(self, identifiers: dict[str, Relationship]) -> None: + self._identify_by = identifiers + + def extend(self, parent: Concept) -> None: + self._extends.append(parent) + + @property + def name(self) -> str: + return self._name + + @property + def type(self) -> ConceptType | None: + return self._type + + @property + def description(self) -> str | None: + return self._description + + @property + def is_builtin(self) -> bool: + return self._builtin + + @property + def is_value_type(self) -> bool: + return self._type == ConceptType.VALUE_TYPE + + @property + def is_entity_type(self) -> bool: + return self._type == ConceptType.ENTITY_TYPE + + @property + def is_primitive(self) -> bool: + if self.is_builtin: + return True + if self._extends and len(self._extends) == 1: + return self._extends[0].is_primitive + return False + + @property + def is_derived(self) -> bool: + return bool(self._derived_by) + + @property + def extends(self) -> list[Concept]: + return list(self._extends) + + @property + def identify_by(self) -> dict[str, Relationship]: + return dict(self._identify_by) + + @property + def derived_by(self) -> list[Formula]: + return list(self._derived_by) + + @property + def requires(self) -> list[Formula]: + return list(self._requires) + + def __str__(self) -> str: + return self._name + + +class Relationship: + """A relationship grouped under its first-role concept (the container). + In this model class we choose to store all the roles explicitly, including the first implicit role from the OSI spec. + """ + _name: str + _container: Concept + _roles: tuple[Role, ...] + _description: str | None + _verbalizes_raw: list[str] | None + _verbalizations: list[RelationshipVerbalization] + _multiplicity: RelationshipMultiplicity | None + _derived_by: list[Formula] + _requires: list[Formula] + + def __init__( + self, + name: str, + container: Concept, + relates: list[tuple[Concept, str | None]], + description: str | None = None, + verbalizes: list[str] | None = None, + multiplicity: RelationshipMultiplicity | None = None, + ): + self._name = name + self._container = container + container_role = Role(self, container, 0, None) + additional = [Role(self, concept, idx + 1, role_name) for idx, (concept, role_name) in enumerate(relates)] + self._roles = tuple([container_role] + additional) + self._description = description + self._multiplicity = multiplicity + self._verbalizes_raw = list(verbalizes) if verbalizes else None + self._verbalizations = parse_verbalizations(self, verbalizes) + self._derived_by = [] + self._requires = [] + + @property + def name(self) -> str: + return self._name + + @property + def full_name(self) -> str: + return f"{self._container.name}.{self._name}" + + @property + def container(self) -> Concept: + return self._container + + @property + def description(self) -> str | None: + return self._description + + @property + def signature(self) -> list[Concept]: + return [role.player for role in self._roles] + + @property + def arity(self) -> int: + return len(self._roles) + + @property + def binary(self) -> bool: + return self.arity == 2 + + @property + def unary(self) -> bool: + return self.arity == 1 + + def role(self, pos: int | Concept | str) -> Role: + if isinstance(pos, int): + return self._roles[pos] + if isinstance(pos, Concept): + for role in self._roles: + if role.player == pos: + return role + elif isinstance(pos, str): + for role in self._roles: + if role.name == pos: + return role + raise ValueError(f"Role '{pos}' not found in relationship '{self.full_name}'") + + @property + def roles(self) -> tuple[Role, ...]: + return self._roles + + def set_multiplicity(self, mult: RelationshipMultiplicity) -> None: + if self._multiplicity is not None and self._multiplicity != mult: + raise ValueError( + f"Conflicting multiplicity settings for relationship {self}: " + f"{self._multiplicity} and {mult}" + ) + self._multiplicity = mult + + @property + def first_role(self) -> Role: + return self._roles[0] + + @property + def last_role(self) -> Role: + return self._roles[-1] + + @property + def verbalizations(self) -> list[RelationshipVerbalization]: + return self._verbalizations + + @property + def verbalizes_raw(self) -> list[str] | None: + return self._verbalizes_raw + + @property + def multiplicity(self) -> RelationshipMultiplicity | None: + return self._multiplicity + + @property + def derived_by(self) -> list[Formula]: + return list(self._derived_by) + + @property + def requires(self) -> list[Formula]: + return list(self._requires) + + def add_derived_by(self, rule: Formula) -> None: + self._derived_by.append(rule) + + def add_require(self, rule: Formula) -> None: + self._requires.append(rule) + + def __str__(self) -> str: + return self._name + + +class Role: + _part_of: Relationship + _player: Concept + _name: str | None + _sibling: Role | None + _idx: int + + def __init__(self, part_of: Relationship, player: Concept, idx: int, name: str | None = None): + self._part_of = part_of + self._player = player + self._idx = idx + self._name = name + self._sibling = None + + @property + def player(self) -> Concept: + return self._player + + @property + def idx(self) -> int: + return self._idx + + @property + def name(self) -> str: + return self._name or self._player.name + + @property + def explicit_name(self) -> str | None: + return self._name + + @property + def part_of(self) -> Relationship: + return self._part_of + + @property + def sibling(self) -> Role | None: + if self._part_of.binary and not self._sibling: + first_role, second_role = self._part_of.roles + sibling = second_role if self == first_role else first_role + self._sibling = sibling + return self._sibling + + @property + def madlib(self) -> str: + return f"{self._player.name}:{self._name}" if self._name else self._player.name + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Role): + return False + return ( + self._part_of == other._part_of + and self._player == other._player + and self._name == other._name + ) + + def __hash__(self) -> int: + return hash((self._part_of, self._player, self._name)) + + +# --------------------------------------------------------------------------- +# Formula — raw expression string only. +# --------------------------------------------------------------------------- + +FormulaParent = Concept | Relationship | tuple[Concept, Relationship] | None + + +class Formula: + _raw_expr: str + _parent: FormulaParent + + def __init__(self, raw_expr: str, parent: FormulaParent = None): + self._raw_expr = raw_expr + self._parent = parent + + @property + def raw_expr(self) -> str: + return self._raw_expr + + @property + def parent(self) -> FormulaParent: + return self._parent + + def __str__(self) -> str: + return self._raw_expr + + +# --------------------------------------------------------------------------- +# Semantic model (datasets, join paths, metrics) +# --------------------------------------------------------------------------- + +@dataclass +class DialectExpression: + dialect: str + expression: str + + +@dataclass +class DialectExpressionSet: + """Runtime equivalent of spec.Expression — same logical expression rendered + in one or more dialects.""" + dialects: list[DialectExpression] = field(default_factory=list) + + def by_dialect(self, dialect: str) -> DialectExpression | None: + for d in self.dialects: + if d.dialect == dialect: + return d + return None + + @property + def primary(self) -> DialectExpression | None: + return self.dialects[0] if self.dialects else None + + +@dataclass +class Dimension: + is_time: bool | None = None + + +@dataclass +class DatasetField: + name: str + expression: DialectExpressionSet + type: Concept | None = None + dimension: Dimension | None = None + label: str | None = None + description: str | None = None + ai_context: AiContext | None = None + custom_extensions: list[CustomExtension] = field(default_factory=list) + # Back-reference to the owning Dataset, wired by Dataset.__init__. Used by + # mapping-expression rendering to reconstruct `.` strings + # for round-trip output. Not in the spec — purely runtime metadata. + dataset: "Dataset | None" = field(default=None, repr=False, compare=False) + + def __str__(self) -> str: + return self.name + + +def sanitize_identifier(ref: str) -> str: + return re.sub(r"[^A-Za-z0-9_]", "_", ref) + + +class Dataset: + _name: str + _source: str + _primary_key: list[str] | None + _unique_keys: list[list[str]] | None + _description: str | None + _ai_context: AiContext | None + _fields: list[DatasetField] + _custom_extensions: list[CustomExtension] + _field_name_map: dict[str, DatasetField] + + def __init__( + self, + name: str, + source: str, + fields: list[DatasetField], + primary_key: list[str] | None = None, + unique_keys: list[list[str]] | None = None, + description: str | None = None, + ai_context: AiContext | None = None, + custom_extensions: list[CustomExtension] | None = None, + ): + self._name = name + self._source = source + self._fields = fields + self._primary_key = primary_key + self._unique_keys = unique_keys + self._description = description + self._ai_context = ai_context + self._custom_extensions = custom_extensions or [] + self._field_name_map = {fl.name: fl for fl in fields} + # Wire the back-reference so each field knows its owning Dataset — + # the mapping-expression renderer needs it to reconstruct + # `.` strings on reverse conversion. + for fl in fields: + fl.dataset = self + + @property + def name(self) -> str: + return self._name + + @property + def source(self) -> str: + return self._source + + @property + def primary_key(self) -> list[str] | None: + return self._primary_key + + @property + def unique_keys(self) -> list[list[str]] | None: + return self._unique_keys + + @property + def description(self) -> str | None: + return self._description + + @property + def ai_context(self) -> AiContext | None: + return self._ai_context + + def field(self, name: str) -> DatasetField | None: + return self._field_name_map.get(name) + + @property + def fields(self) -> list[DatasetField]: + return list(self._fields) + + @property + def custom_extensions(self) -> list[CustomExtension]: + return list(self._custom_extensions) + + @property + def schema(self) -> dict[str, Concept | None]: + return {fl.name: fl.type for fl in self._fields} + + def __str__(self) -> str: + return self._name + + +class JoinPath: + """Runtime equivalent of spec.JoinPath — a foreign-key style join + between two Datasets, matching `from_columns` against `to_columns`.""" + _name: str + _from_dataset: Dataset + _to_dataset: Dataset + _from_columns: list[DatasetField] + _to_columns: list[DatasetField] + _ai_context: AiContext | None + _custom_extensions: list[CustomExtension] + + def __init__( + self, + name: str, + from_dataset: Dataset, + to_dataset: Dataset, + from_columns: list[DatasetField], + to_columns: list[DatasetField], + ai_context: AiContext | None = None, + custom_extensions: list[CustomExtension] | None = None, + ): + if len(from_columns) != len(to_columns): + raise ValueError( + f"JoinPath '{name}': from_columns/to_columns arity mismatch " + f"({len(from_columns)} vs {len(to_columns)})" + ) + self._name = name + self._from_dataset = from_dataset + self._to_dataset = to_dataset + self._from_columns = from_columns + self._to_columns = to_columns + self._ai_context = ai_context + self._custom_extensions = custom_extensions or [] + + @property + def name(self) -> str: + return self._name + + @property + def from_dataset(self) -> Dataset: + return self._from_dataset + + @property + def to_dataset(self) -> Dataset: + return self._to_dataset + + @property + def from_columns(self) -> list[DatasetField]: + return list(self._from_columns) + + @property + def to_columns(self) -> list[DatasetField]: + return list(self._to_columns) + + @property + def ai_context(self) -> AiContext | None: + return self._ai_context + + @property + def custom_extensions(self) -> list[CustomExtension]: + return list(self._custom_extensions) + + def __str__(self) -> str: + return self._name + + +class Metric: + """Logical-model-level metric defined as a multi-dialect aggregate expression.""" + _name: str + _expression: DialectExpressionSet + _description: str | None + _ai_context: AiContext | None + _custom_extensions: list[CustomExtension] + + def __init__( + self, + name: str, + expression: DialectExpressionSet, + description: str | None = None, + ai_context: AiContext | None = None, + custom_extensions: list[CustomExtension] | None = None, + ): + self._name = name + self._expression = expression + self._description = description + self._ai_context = ai_context + self._custom_extensions = custom_extensions or [] + + @property + def name(self) -> str: + return self._name + + @property + def expression(self) -> DialectExpressionSet: + return self._expression + + @property + def description(self) -> str | None: + return self._description + + @property + def ai_context(self) -> AiContext | None: + return self._ai_context + + @property + def custom_extensions(self) -> list[CustomExtension]: + return list(self._custom_extensions) + + +class SemanticModel: + """Bundle of datasets, join paths and metrics. One or more SemanticModels + can feed a single OntologyMapping (see spec).""" + _name: str + _description: str | None + _ai_context: AiContext | None + _datasets: list[Dataset] + _join_paths: list[JoinPath] + _metrics: list[Metric] + _custom_extensions: list[CustomExtension] + _dataset_name_map: dict[str, Dataset] + _join_path_name_map: dict[str, JoinPath] + _metric_name_map: dict[str, Metric] + + def __init__( + self, + name: str, + description: str | None = None, + ai_context: AiContext | None = None, + custom_extensions: list[CustomExtension] | None = None, + ): + self._name = name + self._description = description + self._ai_context = ai_context + self._datasets = [] + self._join_paths = [] + self._metrics = [] + self._custom_extensions = custom_extensions or [] + self._dataset_name_map = {} + self._join_path_name_map = {} + self._metric_name_map = {} + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str | None: + return self._description + + @property + def ai_context(self) -> AiContext | None: + return self._ai_context + + @property + def datasets(self) -> list[Dataset]: + return list(self._datasets) + + @property + def join_paths(self) -> list[JoinPath]: + return list(self._join_paths) + + @property + def metrics(self) -> list[Metric]: + return list(self._metrics) + + @property + def custom_extensions(self) -> list[CustomExtension]: + return list(self._custom_extensions) + + def add_dataset(self, dataset: Dataset) -> None: + if dataset.name in self._dataset_name_map: + raise ValueError(f"Dataset '{dataset.name}' already exists in logical model '{self._name}'") + self._datasets.append(dataset) + self._dataset_name_map[dataset.name] = dataset + + def add_join_path(self, join_path: JoinPath) -> None: + if join_path.name in self._join_path_name_map: + raise ValueError(f"JoinPath '{join_path.name}' already exists in logical model '{self._name}'") + self._join_paths.append(join_path) + self._join_path_name_map[join_path.name] = join_path + + def add_metric(self, metric: Metric) -> None: + if metric.name in self._metric_name_map: + raise ValueError(f"Metric '{metric.name}' already exists in logical model '{self._name}'") + self._metrics.append(metric) + self._metric_name_map[metric.name] = metric + + def lookup_dataset(self, name: str) -> Dataset | None: + return self._dataset_name_map.get(name) + + def lookup_join_path(self, name: str) -> JoinPath | None: + return self._join_path_name_map.get(name) + + def lookup_metric(self, name: str) -> Metric | None: + return self._metric_name_map.get(name) + + +# --------------------------------------------------------------------------- +# Ontology mapping (tree-shaped) +# --------------------------------------------------------------------------- + +@dataclass +class ObjectMapping: + """Maps to objects of some concept — either a direct `expression` (value + types / simple-id entities) or `referent_mappings` (compound id). XOR — + never both. + + `expression` carries the *parsed* mapping expression: a `DatasetField` + when it resolves to a single field reference, or a `Formula` for richer + expressions. The forward converter parses the spec's raw string and the + reverse converter reconstructs it — storing the parsed form rather than the + raw string lets callers introspect the mapping target.""" + concept: Concept | None = None + expression: DatasetField | Formula | None = None + referent_mappings: list[ReferentMapping] | None = None + + def __post_init__(self) -> None: + has_expr = self.expression is not None + has_refs = self.referent_mappings is not None + if has_expr and has_refs: + raise ValueError("ObjectMapping must not have both expression and referent_mappings") + if not has_expr and not has_refs: + raise ValueError("ObjectMapping must have either expression or referent_mappings") + + +@dataclass +class ReferentMapping: + """Locates an entity object by walking one of its identifying relationships. + + `expression`, like ObjectMapping's, is the parsed result — a `DatasetField` + for simple references or a `Formula` for richer expressions. Nested + `referent_mappings` descend into compound identifiers.""" + relationship: Relationship + expression: DatasetField | Formula | None = None + referent_mappings: list[ReferentMapping] | None = None + + def __post_init__(self) -> None: + has_expr = self.expression is not None + has_refs = self.referent_mappings is not None + if has_expr and has_refs: + raise ValueError("ReferentMapping must not have both expression and referent_mappings") + if not has_expr and not has_refs: + raise ValueError("ReferentMapping must have either expression or referent_mappings") + + +@dataclass +class LinkMapping: + """A node in the link-mapping tree. The arity of `relationship` equals the + node's depth (top-level = unary, depth 2 = binary, ...). `children` extend + the mapped tuple by one role each, sharing this node's `object_mapping`.""" + object_mapping: ObjectMapping + relationship: Relationship | None = None + children: list[LinkMapping] | None = None + + +@dataclass +class ConceptMapping: + """Mappings that populate one concept and the relationships under it.""" + concept: Concept + object_mappings: list[ObjectMapping] = field(default_factory=list) + link_mappings: list[LinkMapping] = field(default_factory=list) + + +class OntologyMapping: + """Binds a logical model to an ontology and declares how its fields + populate the ontology's concepts and relationships.""" + _name: str + _description: str | None + _ontology: OntologyComponent + _semantic_model: SemanticModel + _concept_mappings: list[ConceptMapping] + + def __init__( + self, + name: str, + ontology: OntologyComponent, + semantic_model: SemanticModel, + description: str | None = None, + ): + self._name = name + self._description = description + self._ontology = ontology + self._semantic_model = semantic_model + self._concept_mappings = [] + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str | None: + return self._description + + @property + def ontology(self) -> OntologyComponent: + return self._ontology + + @property + def semantic_model(self) -> SemanticModel: + return self._semantic_model + + @property + def concept_mappings(self) -> list[ConceptMapping]: + return list(self._concept_mappings) + + def add_concept_mapping(self, cm: ConceptMapping) -> None: + self._concept_mappings.append(cm) + + +# --------------------------------------------------------------------------- +# Observer protocol + Ontology component (mirrors OntologyComponent in spec) +# --------------------------------------------------------------------------- + +class OntologyObserver(Protocol): + """Structural interface for objects that want to be notified when concepts + or requires are added to an OntologyComponent. Implement both methods + and pass an instance to OntologyComponent.register().""" + + def on_concept_added(self, concept: Concept) -> None: ... + + def on_require_added(self, require: Formula) -> None: ... + + +class OntologyComponent: + """Structural container for concepts, relationships, constraints, and rules. + Document-level metadata (name, description, ai_context) lives on OsiOntology.""" + _concepts: list[Concept] + _relationships: list[Relationship] + _rules: list[Formula] + _requires: list[Formula] + _concept_name_map: dict[str, Concept] + _relationship_name_map: dict[str, Relationship] + _observers: list[OntologyObserver] + + def __init__(self): + self._concepts = [] + self._relationships = [] + self._rules = [] + self._requires = [] + self._concept_name_map = {} + self._relationship_name_map = {} + self._observers = [] + + def register(self, observer: OntologyObserver) -> None: + self._observers.append(observer) + for concept in self._concepts: + observer.on_concept_added(concept) + for require in self._requires: + observer.on_require_added(require) + + def add_concept(self, concept: Concept) -> None: + if concept.name in self._concept_name_map: + raise ValueError(f"Concept '{concept.name}' already exists in the ontology") + self._concepts.append(concept) + self._concept_name_map[concept.name] = concept + for obs in self._observers: + obs.on_concept_added(concept) + + def add_relationship(self, relationship: Relationship) -> None: + full_name = relationship.full_name + if full_name in self._relationship_name_map: + raise ValueError(f"Relationship '{full_name}' already exists in the ontology") + self._relationships.append(relationship) + self._relationship_name_map[full_name] = relationship + + def add_rule(self, rule: Formula) -> None: + self._rules.append(rule) + + def add_require(self, require: Formula) -> None: + self._requires.append(require) + for obs in self._observers: + obs.on_require_added(require) + + def concepts(self, exclude_builtin: bool = False) -> list[Concept]: + if exclude_builtin: + return [c for c in self._concepts if not c.is_builtin] + return list(self._concepts) + + @property + def relationships(self) -> list[Relationship]: + return list(self._relationships) + + @property + def rules(self) -> list[Formula]: + return list(self._rules) + + @property + def requires(self) -> list[Formula]: + return list(self._requires) + + def lookup_concept(self, name: str | None) -> Concept | None: + if not name: + return None + if name in self._concept_name_map: + return self._concept_name_map[name] + if name in BUILTIN_CONCEPTS: + concept = Concept(name=name, builtin=True) + self.add_concept(concept) + return concept + return None + + def lookup_concept_relationship(self, concept: Concept, name: str) -> Relationship | None: + rel = self._relationship_name_map.get(f"{concept.name}.{name}") + if rel: + return rel + for ext in concept.extends: + rel = self.lookup_concept_relationship(ext, name) + if rel: + return rel + return None + + +# --------------------------------------------------------------------------- +# Root semantic model (per OsiSpec) +# --------------------------------------------------------------------------- + +class OsiOntology: + _name: str + _description: str | None + _ai_context: AiContext | None + _version: str | None + _ontology: OntologyComponent + _ontology_mappings: list[OntologyMapping] + _ontology_mapping_index: dict[str, OntologyMapping] + + def __init__( + self, + name: str, + ontology: OntologyComponent, + description: str | None = None, + ai_context: AiContext | None = None, + version: str | None = None, + ): + self._name = name + self._description = description + self._ai_context = ai_context + self._version = version + self._ontology = ontology + self._ontology_mappings = [] + self._ontology_mapping_index = {} + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str | None: + return self._description + + @property + def ai_context(self) -> AiContext | None: + return self._ai_context + + @property + def version(self) -> str | None: + return self._version + + @property + def ontology(self) -> OntologyComponent: + return self._ontology + + def add_ontology_mapping(self, mapping: OntologyMapping) -> None: + if mapping.name in self._ontology_mapping_index: + raise ValueError(f"OntologyMapping '{mapping.name}' already exists in model") + self._ontology_mappings.append(mapping) + self._ontology_mapping_index[mapping.name] = mapping + + @property + def ontology_mappings(self) -> list[OntologyMapping]: + return list(self._ontology_mappings) + + +# --------------------------------------------------------------------------- +# Verbalization parser (handles a list of verbalization patterns) +# --------------------------------------------------------------------------- + +@dataclass +class Verbalization: + text: str + + +@dataclass +class RelationshipVerbalization(Verbalization): + roles: list[VerbalizationRole] + + +@dataclass +class VerbalizationRole: + concept: Concept + name: str | None = None + preceding_text: str | None = None + prefix: str | None = None + following_text: str | None = None + postfix: str | None = None + + def verbalization_name(self) -> str: + return f"{{{self.concept.name}:{self.name}}}" if self.name else f"{{{self.concept.name}}}" + + +_CONCEPT_TOKEN_RE = re.compile(r"\{([^:}]+?)(?::([^}]+))?\}") + + +def parse_verbalizations( + relationship: Relationship, verbalizations: list[str] | None +) -> list[RelationshipVerbalization]: + if not verbalizations: + return [_build_verbalization(relationship)] + return [_parse_verbalization(relationship, v) for v in verbalizations] + + +def _build_verbalization(relationship: Relationship) -> RelationshipVerbalization: + roles: list[VerbalizationRole] = [] + parts: list[str] = [] + for role in relationship.roles: + vr = VerbalizationRole(concept=role.player, name=role.explicit_name) + roles.append(vr) + parts.append(vr.verbalization_name()) + if relationship.unary: + return RelationshipVerbalization(text=f"{relationship.name} {parts[0]}", roles=roles) + return RelationshipVerbalization(text=" has ".join(parts), roles=roles) + + +def _parse_verbalization(relationship: Relationship, verbalization: str) -> RelationshipVerbalization: + """ + Parse a verbalization string into an ordered list of :class:`VerbalizationRole` objects. + + Format example: + + 'every chain- super {Store} reports returns of {Item} big -box for average- {Amount:amt}' + + The string may contain any number of ``{Concept}`` / ``{Concept:roleName}`` tokens. + Text between tokens is split uniformly by :func:`_split_segment` into + ``(postfix, middle, prefix)`` and assigned to the adjacent roles: + + +-----------+------------------------+------------------------------+----------------------+ + | position | postfix | middle | prefix | + +===========+========================+==============================+======================+ + | segment 0 | *ignored* | → roles[0].preceding_text | → roles[0].prefix | + +-----------+------------------------+------------------------------+----------------------+ + | segment i | → roles[i-1].postfix | → roles[i-1].following_text | → roles[i].prefix | + +-----------+------------------------+------------------------------+----------------------+ + | last seg | → roles[-1].postfix | → roles[-1].following_text | *ignored* | + +-----------+------------------------+------------------------------+----------------------+ + """ + tokens = list(_CONCEPT_TOKEN_RE.finditer(verbalization)) + if len(tokens) != relationship.arity: + raise ValueError( + f"Number of roles in verbalization '{verbalization}' for relationship " + f"{relationship.full_name} don't match" + ) + segments: list[str] = [] + roles: list[VerbalizationRole] = [] + prev_end = 0 + for idx, m in enumerate(tokens): + role = relationship.role(idx) + segments.append(verbalization[prev_end:m.start()].strip()) + verb_concept_name = m.group(1).strip() + rel_role_name = role.explicit_name + verb_role_name = m.group(2).strip() if m.group(2) else None + if rel_role_name != verb_role_name or role.player.name != verb_concept_name: + raise ValueError( + f"Role {idx}: '{role.player.name}:{role.name}' " + f"does not match verbalization role '{verb_concept_name}:{verb_role_name}'" + ) + roles.append(VerbalizationRole(concept=role.player, name=verb_role_name)) + prev_end = m.end() + segments.append(verbalization[prev_end:].strip()) + for i, seg in enumerate(segments): + if not seg: + continue + postfix, middle, prefix = _split_segment(seg) + if i == 0: + roles[0].preceding_text = middle + roles[0].prefix = prefix + elif i == len(tokens): + roles[-1].postfix = postfix + roles[-1].following_text = middle + else: + roles[i - 1].postfix = postfix + roles[i - 1].following_text = middle + roles[i].prefix = prefix + return RelationshipVerbalization(text=verbalization, roles=roles) + + +def _split_segment(segment: str) -> tuple[str | None, str | None, str | None]: + words = segment.split() + if not words: + return None, None, None + + postfix_end = 0 + if any(w.startswith("-") for w in words): + postfix_end = 1 + while postfix_end < len(words) and words[postfix_end].startswith("-"): + postfix_end += 1 + + prefix_start = len(words) + for i in range(postfix_end, len(words)): + if words[i].endswith("-"): + prefix_start = i + break + + postfix = " ".join(w.lstrip("-") for w in words[:postfix_end]) if postfix_end > 0 else None + prefix = " ".join(w.rstrip("-") for w in words[prefix_start:]) if prefix_start < len(words) else None + middle = " ".join(words[postfix_end:prefix_start]) if postfix_end < prefix_start else None + + return postfix, middle, prefix diff --git a/converters/ontology/src/osi/parser/__init__.py b/converters/ontology/src/osi/parser/__init__.py new file mode 100644 index 0000000..7c6d846 --- /dev/null +++ b/converters/ontology/src/osi/parser/__init__.py @@ -0,0 +1,49 @@ +"""Entrypoint: read a YAML/JSON OSI spec and produce a OsiOntology.""" + +from __future__ import annotations + +import json +from io import IOBase + +import yaml + +from osi.converter.spec_to_osi.converter import SpecToOsiConverter +from osi.model import OsiOntology +from osi.spec import OsiSpec + + +class OsiParser: + _model: OsiOntology | None + _spec: OsiSpec | None + _debug: bool + + def __init__(self, debug: bool = False): + self._debug = debug + self._model = None + self._spec = None + + def parse(self, file: IOBase) -> None: + raw = OsiParser.load_data(file) + self._spec = OsiSpec.model_validate(raw) + self._model = SpecToOsiConverter.convert(self._spec) + + @staticmethod + def load_data(file: IOBase): + content = file.read() + file.seek(0) + name = (getattr(file, "name", "") or "").lower() + if name.endswith(".json"): + return json.loads(content) + return yaml.safe_load(content) + + def spec(self) -> OsiSpec: + spec = self._spec + if spec is None: + raise RuntimeError("You must call 'parse()' before accessing 'spec()'") + return spec + + def model(self) -> OsiOntology: + model = self._model + if model is None: + raise RuntimeError("You must call 'parse()' before accessing 'model()'") + return model \ No newline at end of file diff --git a/converters/ontology/src/osi/spec.py b/converters/ontology/src/osi/spec.py new file mode 100644 index 0000000..200f78e --- /dev/null +++ b/converters/ontology/src/osi/spec.py @@ -0,0 +1,248 @@ +from __future__ import annotations + +from typing import Any, Literal + +import yaml +from pydantic import BaseModel, ConfigDict, Field + + +class OsiObject(BaseModel): + """Base for all OSI DTOs. Strict (`extra=forbid`) to surface spec drift early.""" + model_config = ConfigDict( + populate_by_name=True, + arbitrary_types_allowed=True, + extra="forbid", + ) + + +# Free-form AI context: either a bare string or a structured object with keys +# like `instructions`, `synonyms`, `examples` (per core.md). +AiContext = str | dict[str, Any] + + +class CustomExtension(OsiObject): + """Vendor-specific metadata attached to any logical-model element (core.md). + + `data` is a JSON-encoded string so vendors can carry arbitrary payloads + without extending the core schema. + """ + vendor_name: str + data: str + + +# ---------- Ontology ---------- + +class Role(OsiObject): + """An additional role in a Relationship (the first role is implicit — the + container concept). `name` is only required to disambiguate when the same + concept plays multiple roles in the same relationship.""" + concept: str + name: str | None = None + + +class Relationship(OsiObject): + """A relationship grouped under its first-role concept. + + `roles` enumerates the *additional* roles (the first is the container + concept). `multiplicity` constrains the last role; `OneToOne` is only + valid for binary relationships. `verbalizes` is a list of natural-language + patterns with `{Concept}` or `{Concept:role_name}` placeholders. + `derived_by` and `requires` are raw expression strings (parsed elsewhere). + """ + name: str + description: str | None = None + roles: list[Role] = Field(default_factory=list) + verbalizes: list[str] = Field(default_factory=list) + multiplicity: Literal["OneToOne", "ManyToOne"] | None = None + derived_by: list[str] = Field(default_factory=list) + requires: list[str] = Field(default_factory=list) + + +class Concept(OsiObject): + """A type-like node in the ontology — either an `EntityType` (real-world + object referenced via other relationships) or a `ValueType` (a data type + with extra semantics, must transitively extend a built-in value type). + + `identify_by` lists the names of relationships (declared under this + concept) whose values uniquely reference its objects. + """ + name: str + type: Literal["EntityType", "ValueType"] | None = None + description: str | None = None + extends: list[str] | None = None + identify_by: list[str] = Field(default_factory=list) + derived_by: list[str] = Field(default_factory=list) + requires: list[str] = Field(default_factory=list) + + +class ConceptComponent(OsiObject): + """Envelope for a concept and the relationships nested under it. + + Mirrors the YAML shape `{ concept: {...}, relationships: [...] }` where + every relationship in the list takes the enclosing concept as its + implicit first role. + """ + concept: Concept + relationships: list[Relationship] = Field(default_factory=list) + + +# ---------- Logical model (per osi/core.md) ---------- + +class DialectExpression(OsiObject): + """A scalar (non-aggregating) SQL/expression in a specific dialect.""" + dialect: str + expression: str + + +class Expression(OsiObject): + """Multi-dialect expression carrier — same logical expression rendered in + one or more dialects (e.g. ANSI_SQL + SNOWFLAKE).""" + dialects: list[DialectExpression] = Field(default_factory=list) + + +class Dimension(OsiObject): + """Dimensional metadata on a DatasetField.""" + is_time: bool | None = None + + +class DatasetField(OsiObject): + """A row-level attribute of a Dataset. `expression` is scalar (no + aggregations); use Metric for aggregates.""" + name: str + expression: Expression + dimension: Dimension | None = None + label: str | None = None + description: str | None = None + ai_context: AiContext | None = None + custom_extensions: list[CustomExtension] = Field(default_factory=list) + + +class Dataset(OsiObject): + """A logical dataset (fact or dimension table) backed by `source` — a + physical table/view reference or a query.""" + name: str + source: str + primary_key: list[str] | None = None + unique_keys: list[list[str]] | None = None + description: str | None = None + ai_context: AiContext | None = None + fields: list[DatasetField] = Field(default_factory=list) + custom_extensions: list[CustomExtension] = Field(default_factory=list) + + +class JoinPath(OsiObject): + """A foreign-key style join between two Datasets: rows in `from` reference + rows in `to` by matching `from_columns` against `to_columns` in order. + Same arity required on both sides.""" + name: str + from_: str = Field(alias="from") + to: str + from_columns: list[str] + to_columns: list[str] + ai_context: AiContext | None = None + custom_extensions: list[CustomExtension] = Field(default_factory=list) + + +class Metric(OsiObject): + """A model-level quantitative measure defined as an aggregate expression. + Can reference fields across multiple Datasets.""" + name: str + expression: Expression + description: str | None = None + ai_context: AiContext | None = None + custom_extensions: list[CustomExtension] = Field(default_factory=list) + + +class SemanticModel(OsiObject): + """A complete logical/semantic model (the body that the core spec calls + `semantic_model`): datasets plus the join paths and metrics defined over + them. One or more SemanticModels can feed a single OntologyMapping.""" + name: str + description: str | None = None + ai_context: AiContext | None = None + datasets: list[Dataset] = Field(default_factory=list) + relationships: list[JoinPath] = Field(default_factory=list) + metrics: list[Metric] = Field(default_factory=list) + custom_extensions: list[CustomExtension] = Field(default_factory=list) + + +# ---------- Ontology mapping ---------- + +class ReferentMapping(OsiObject): + """Locates an entity object by walking one of its identifying + relationships. Carries either a leaf `expression` (SQL over dataset + fields) or a nested `referent_mappings` list when the referenced concept + is itself an entity with a compound/recursive identifier.""" + relationship: str + expression: str | None = None + referent_mappings: list[ReferentMapping] | None = None + + +class ObjectMapping(OsiObject): + """Maps to objects of some concept. Either a direct scalar `expression` + (for value types or simple-id entities) or `referent_mappings` (for + entities with compound identifiers). XOR — never both.""" + concept: str | None = None + expression: str | None = None + referent_mappings: list[ReferentMapping] | None = None + + +class LinkMapping(OsiObject): + """A node in the link-mapping tree. The arity of `relationship` equals + the node's depth (top-level = unary, depth 2 = binary, etc.). `children` + extend the mapped tuple by one role each, sharing this node's + `object_mapping` as their prefix to avoid duplication.""" + object_mapping: ObjectMapping + relationship: str | None = None + children: list[LinkMapping] | None = None + + +class ConceptMapping(OsiObject): + """Mappings that populate one concept and the relationships grouped under + it. `object_mappings` populate the concept's objects; `link_mappings` is + a forest of trees populating its relationships.""" + concept: str + object_mappings: list[ObjectMapping] = Field(default_factory=list) + link_mappings: list[LinkMapping] = Field(default_factory=list) + + +class OntologyMapping(OsiObject): + """Binds a semantic model to the document ontology, then declares how its + fields populate the ontology's concepts and relationships.""" + name: str + description: str | None = None + semantic_model: SemanticModel + concept_mappings: list[ConceptMapping] = Field(default_factory=list) + + +# ---------- Root ---------- + +class OsiSpec(OsiObject): + """Root OSI document: a single ontology definition and the ontology + mappings that wire semantic models into it.""" + version: str | None = None + name: str + description: str | None = None + ai_context: AiContext | None = None + ontology: list[ConceptComponent] = Field(default_factory=list) + ontology_mappings: list[OntologyMapping] = Field(default_factory=list) + + @classmethod + def load_yaml(cls, text: str) -> OsiSpec: + return cls.model_validate(yaml.safe_load(text)) + + def dump_dict(self) -> dict: + return self.model_dump(exclude_none=True, exclude_defaults=True, by_alias=True) + + def dump_yaml(self) -> str: + return yaml.safe_dump(self.dump_dict(), sort_keys=False) + + +# `ReferentMapping` and `LinkMapping` are self-referential (each can contain a +# list of itself). Combined with `from __future__ import annotations`, every +# annotation is a string at class-definition time, so the self-reference is an +# unresolved forward ref. `model_rebuild()` re-walks the schema once the class +# is fully defined and pins the forward ref to the real type — without it, +# validating a payload with nested children raises PydanticUndefinedAnnotation. +ReferentMapping.model_rebuild() +LinkMapping.model_rebuild() \ No newline at end of file