Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions converters/spark/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.osi</groupId>
<artifactId>osi-spark-converter</artifactId>
<version>0.1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>OSI Spark Converter</name>
<description>Converts OSI semantic models to Apache Spark code</description>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.5.1</spark.version>
<snakeyaml.version>2.2</snakeyaml.version>
<junit.version>5.10.2</junit.version>
</properties>

<dependencies>
<!-- YAML parsing -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>

<!-- Apache Spark (provided — the generated code runs on a Spark cluster) -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>org.osi.converter.spark.OsiSparkConverter</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.osi.converter.spark;

import org.osi.converter.spark.model.OsiModel;
import org.osi.converter.spark.model.OsiModel.*;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
* Parses an OSI YAML file into an {@link OsiModel}.
*/
public class OsiModelParser {

/**
* Parse an OSI YAML file from the given path.
*/
public OsiModel parse(Path yamlPath) throws IOException {
try (InputStream is = Files.newInputStream(yamlPath)) {
return parse(is);
}
}

/**
* Parse an OSI YAML file from an input stream.
*/
@SuppressWarnings("unchecked")
public OsiModel parse(InputStream is) {
Yaml yaml = new Yaml();
Map<String, Object> root = yaml.load(is);

OsiModel model = new OsiModel();
model.setVersion((String) root.get("version"));

List<Map<String, Object>> smList = (List<Map<String, Object>>) root.get("semantic_model");
if (smList == null) {
return model;
}

List<SemanticModel> semanticModels = new ArrayList<>();
for (Map<String, Object> smMap : smList) {
semanticModels.add(parseSemanticModel(smMap));
}
model.setSemanticModels(semanticModels);
return model;
}

@SuppressWarnings("unchecked")
private SemanticModel parseSemanticModel(Map<String, Object> map) {
SemanticModel sm = new SemanticModel();
sm.setName((String) map.get("name"));
sm.setDescription((String) map.get("description"));

// Datasets
List<Map<String, Object>> dsList = (List<Map<String, Object>>) map.get("datasets");
if (dsList != null) {
List<Dataset> datasets = new ArrayList<>();
for (Map<String, Object> dsMap : dsList) {
datasets.add(parseDataset(dsMap));
}
sm.setDatasets(datasets);
}

// Relationships
List<Map<String, Object>> relList = (List<Map<String, Object>>) map.get("relationships");
if (relList != null) {
List<Relationship> relationships = new ArrayList<>();
for (Map<String, Object> relMap : relList) {
relationships.add(parseRelationship(relMap));
}
sm.setRelationships(relationships);
}

// Metrics
List<Map<String, Object>> metricList = (List<Map<String, Object>>) map.get("metrics");
if (metricList != null) {
List<Metric> metrics = new ArrayList<>();
for (Map<String, Object> mMap : metricList) {
metrics.add(parseMetric(mMap));
}
sm.setMetrics(metrics);
}

return sm;
}

@SuppressWarnings("unchecked")
private Dataset parseDataset(Map<String, Object> map) {
Dataset ds = new Dataset();
ds.setName((String) map.get("name"));
ds.setSource((String) map.get("source"));
ds.setDescription((String) map.get("description"));

List<String> pk = (List<String>) map.get("primary_key");
if (pk != null) {
ds.setPrimaryKey(new ArrayList<>(pk));
}

List<Map<String, Object>> fieldList = (List<Map<String, Object>>) map.get("fields");
if (fieldList != null) {
List<Field> fields = new ArrayList<>();
for (Map<String, Object> fMap : fieldList) {
fields.add(parseField(fMap));
}
ds.setFields(fields);
}
return ds;
}

@SuppressWarnings("unchecked")
private Field parseField(Map<String, Object> map) {
Field field = new Field();
field.setName((String) map.get("name"));
field.setDescription((String) map.get("description"));

// Dimension
Map<String, Object> dim = (Map<String, Object>) map.get("dimension");
if (dim != null) {
Object isTime = dim.get("is_time");
field.setTime(Boolean.TRUE.equals(isTime));
}

// Expressions
field.setExpressions(parseDialectExpressions(map));
return field;
}

@SuppressWarnings("unchecked")
private Relationship parseRelationship(Map<String, Object> map) {
Relationship rel = new Relationship();
rel.setName((String) map.get("name"));
rel.setFrom((String) map.get("from"));
rel.setTo((String) map.get("to"));

List<String> fromCols = (List<String>) map.get("from_columns");
if (fromCols != null) {
rel.setFromColumns(new ArrayList<>(fromCols));
}
List<String> toCols = (List<String>) map.get("to_columns");
if (toCols != null) {
rel.setToColumns(new ArrayList<>(toCols));
}
return rel;
}

@SuppressWarnings("unchecked")
private Metric parseMetric(Map<String, Object> map) {
Metric metric = new Metric();
metric.setName((String) map.get("name"));
metric.setDescription((String) map.get("description"));
metric.setExpressions(parseDialectExpressions(map));
return metric;
}

@SuppressWarnings("unchecked")
private List<DialectExpression> parseDialectExpressions(Map<String, Object> map) {
List<DialectExpression> result = new ArrayList<>();
Map<String, Object> exprBlock = (Map<String, Object>) map.get("expression");
if (exprBlock == null) {
return result;
}
List<Map<String, Object>> dialects = (List<Map<String, Object>>) exprBlock.get("dialects");
if (dialects == null) {
return result;
}
for (Map<String, Object> d : dialects) {
String dialect = (String) d.get("dialect");
Object exprValue = d.get("expression");
String expression = exprValue != null ? exprValue.toString() : null;
result.add(new DialectExpression(dialect, expression));
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.osi.converter.spark;

import org.osi.converter.spark.model.OsiModel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
* CLI entry point: reads an OSI YAML semantic model and generates PySpark code.
*
* <pre>
* Usage:
* java -jar osi-spark-converter.jar &lt;osi_model.yaml&gt; [-o output.py] [-d DIALECT]
* </pre>
*/
public class OsiSparkConverter {

public static void main(String[] args) throws IOException {
if (args.length < 1) {
System.err.println("Usage: osi-spark-converter <osi_model.yaml> [-o output.py] [-d DIALECT]");
System.err.println();
System.err.println("Options:");
System.err.println(" -o FILE Write generated PySpark code to FILE (default: stdout)");
System.err.println(" -d DIALECT Preferred SQL dialect: ANSI_SQL, SNOWFLAKE, DATABRICKS (default: ANSI_SQL)");
System.exit(1);
}

String inputFile = args[0];
String outputFile = null;
String dialect = "ANSI_SQL";

for (int i = 1; i < args.length; i++) {
switch (args[i]) {
case "-o":
if (i + 1 < args.length) {
outputFile = args[++i];
}
break;
case "-d":
if (i + 1 < args.length) {
dialect = args[++i];
}
break;
default:
break;
}
}

// Parse the OSI model
OsiModelParser parser = new OsiModelParser();
OsiModel model = parser.parse(Paths.get(inputFile));

if (model.getSemanticModels().isEmpty()) {
System.err.println("Error: no semantic_model found in " + inputFile);
System.exit(1);
}

// Generate PySpark code
SparkCodeGenerator generator = new SparkCodeGenerator(dialect);
String code = generator.generate(model);

if (outputFile != null) {
Files.write(Paths.get(outputFile), code.getBytes(StandardCharsets.UTF_8));
System.out.println("Generated PySpark code written to " + outputFile);
} else {
System.out.println(code);
}
}
}
Loading