From 3bf160d226423c569b50b366e2ef4bc7da68ca49 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 26 Nov 2025 00:15:18 -0800 Subject: [PATCH 1/7] Add SPI interfaces for external catalog compatibility testing This commit introduces Service Provider Interface (SPI) support that allows external catalog implementations to plug into Iceberg's test suite without modifying Iceberg source code. New SPI Interfaces: - TestTableProvider: Creates tables for iceberg-core tests - TestCatalogProvider: Provides catalog configurations for Spark tests - TestSparkSessionProvider: Provides custom SparkSession instances Modified Test Infrastructure: - TableTestBase: Discovers and uses external TestTableProvider via SPI - TestBase: Discovers and uses external TestSparkSessionProvider via SPI - TestBaseWithCatalog: Discovers and uses external TestCatalogProvider via SPI - CatalogTestBase: Respects external catalog configurations - ParameterizedTestExtension: Prefers most specific @Parameters method New Gradle Configuration: - spark/openhouse.gradle: Shared compatibility test task configuration - build.gradle: openhouseCompatibility aggregate task Usage: ./gradlew openhouseCompatibility -PopenhouseCompatibilityCoordinate= This enables running 860+ Iceberg tests against external catalogs like OpenHouse without forking or patching Iceberg's test classes. --- .../iceberg/ParameterizedTestExtension.java | 42 ++++++- build.gradle | 58 +++++++++ .../org/apache/iceberg/TableTestBase.java | 70 +++++++++++ .../org/apache/iceberg/TestTableProvider.java | 61 +++++++++ gradle.properties | 1 + .../iceberg/hive/TestHiveMetastore.java | 6 + spark/openhouse.gradle | 75 +++++++++++ spark/v3.5/build.gradle | 119 +++++++++++++++++- .../spark/extensions/ExtensionsTestBase.java | 7 +- .../apache/iceberg/spark/CatalogTestBase.java | 72 +++++++---- .../org/apache/iceberg/spark/TestBase.java | 81 ++++++++++-- .../iceberg/spark/TestBaseWithCatalog.java | 93 +++++++++++++- .../iceberg/spark/TestCatalogProvider.java | 55 ++++++++ .../spark/TestSparkSessionProvider.java | 62 +++++++++ 14 files changed, 753 insertions(+), 49 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestTableProvider.java create mode 100644 spark/openhouse.gradle create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java diff --git a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java index 59652bab98..789ae6b2a1 100644 --- a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java +++ b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java @@ -70,18 +70,16 @@ public boolean supportsTestTemplate(ExtensionContext context) { public Stream provideTestTemplateInvocationContexts( ExtensionContext context) { - // Search method annotated with @Parameters + // Search for methods annotated with @Parameters, preferring the most specific class final List parameterProviders = AnnotationSupport.findAnnotatedMethods( context.getRequiredTestClass(), Parameters.class, HierarchyTraversalMode.TOP_DOWN); - if (parameterProviders.isEmpty()) { + Method parameterProvider = + resolveParameterProvider(context.getRequiredTestClass(), parameterProviders); + if (parameterProvider == null) { throw new IllegalStateException("Cannot find any parameter provider"); } - if (parameterProviders.size() > 1) { - throw new IllegalStateException("Multiple parameter providers are found"); - } - Method parameterProvider = parameterProviders.get(0); // Get potential test name String testNameTemplate = parameterProvider.getAnnotation(Parameters.class).name(); @@ -127,6 +125,38 @@ public Stream provideTestTemplateInvocationContex parameterProvider)); } + private static Method resolveParameterProvider( + Class testClass, List parameterProviders) { + if (parameterProviders.isEmpty()) { + return null; + } + + Class current = testClass; + while (current != null) { + Method selected = null; + for (Method candidate : parameterProviders) { + if (candidate.getDeclaringClass().equals(current)) { + if (selected != null) { + throw new IllegalStateException("Multiple parameter providers are found"); + } + selected = candidate; + } + } + + if (selected != null) { + return selected; + } + + current = current.getSuperclass(); + } + + if (parameterProviders.size() > 1) { + throw new IllegalStateException("Multiple parameter providers are found"); + } + + return parameterProviders.get(0); + } + private static class FieldInjectingInvocationContext implements TestTemplateInvocationContext { private final String testNameTemplate; diff --git a/build.gradle b/build.gradle index 90675cfb98..495223cb0d 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,12 @@ buildscript { String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions") List sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : [] +// OpenHouse compatibility testing configuration +String openHouseCompatibilitySparkMajorVersion = findProperty("openhouseCompatibilitySparkMajorVersion") ?: '3.5' +ext { + openHouseCompatibilityCoordinate = findProperty("openhouseCompatibilityCoordinate") ?: + 'com.linkedin.openhouse:tables-test-fixtures-iceberg-1.5_2.12:0.0.+:uber' +} try { // apply these plugins in a try-catch block so that we can handle cases without .git directory @@ -128,6 +134,10 @@ allprojects { repositories { mavenCentral() mavenLocal() + // LinkedIn OpenHouse artifacts for compatibility testing + maven { + url "https://linkedin.jfrog.io/artifactory/openhouse" + } } } @@ -335,6 +345,14 @@ project(':iceberg-common') { } project(':iceberg-core') { + configurations { + openhouseCompatibilityRuntime { + canBeConsumed = false + canBeResolved = true + description = 'Dependencies required for OpenHouse compatibility verification' + } + } + test { useJUnitPlatform() } @@ -370,6 +388,39 @@ project(':iceberg-core') { testImplementation libs.esotericsoftware.kryo testImplementation libs.guava.testlib testImplementation libs.awaitility + + openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { + transitive = false + } + openhouseCompatibilityRuntime project(':iceberg-aws') + openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' + openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' + } + + tasks.register('openhouseCompatibilityTest', Test) { + useJUnitPlatform() + group = 'verification' + description = 'Runs the OpenHouse compatibility subset for iceberg-core' + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime + systemProperty "iceberg.test.table.provider", + 'org.apache.iceberg.openhouse.OpenHouseTestTableProvider' + filter { + includeTestsMatching "org.apache.iceberg.TestWapWorkflow" + includeTestsMatching "org.apache.iceberg.TestFastAppend" + includeTestsMatching "org.apache.iceberg.TestMergeAppend" + includeTestsMatching "org.apache.iceberg.TestOverwrite" + includeTestsMatching "org.apache.iceberg.TestReplacePartitions" + includeTestsMatching "org.apache.iceberg.TestRowDelta" + includeTestsMatching "org.apache.iceberg.TestDeleteFiles" + includeTestsMatching "org.apache.iceberg.TestTransaction" + includeTestsMatching "org.apache.iceberg.TestCreateTransaction" + includeTestsMatching "org.apache.iceberg.TestRemoveSnapshots" + includeTestsMatching "org.apache.iceberg.TestSnapshot" + includeTestsMatching "org.apache.iceberg.TestSnapshotSelection" + includeTestsMatching "org.apache.iceberg.TestRewriteFiles" + includeTestsMatching "org.apache.iceberg.TestRewriteManifests" + } } } @@ -1030,6 +1081,13 @@ apply from: 'baseline.gradle' apply from: 'deploy.gradle' apply from: 'tasks.gradle' +tasks.register('openhouseCompatibility') { + group = "verification" + description = "Runs all OpenHouse compatibility tests" + dependsOn project(':iceberg-core').tasks.openhouseCompatibilityTest + dependsOn project(":iceberg-spark:iceberg-spark-${openHouseCompatibilitySparkMajorVersion}_${scalaVersion}").tasks.openhouseCompatibilityTest +} + project(':iceberg-bom') { apply plugin: 'java-platform' diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c3db859101..c4a5091176 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.ServiceLoader; import java.util.Set; import java.util.UUID; import org.apache.iceberg.deletes.PositionDelete; @@ -43,6 +44,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -171,6 +173,9 @@ public class TableTestBase { protected File metadataDir = null; public TestTables.TestTable table = null; + private static final TestTableProvider EXTERNAL_TABLE_PROVIDER = + initializeExternalTableProvider(); + protected final int formatVersion; @SuppressWarnings("checkstyle:MemberName") @@ -199,6 +204,20 @@ public void cleanupTables() { TestTables.clearTables(); } + @AfterClass + public static void shutdownExternalTableProvider() { + if (EXTERNAL_TABLE_PROVIDER != null) { + try { + EXTERNAL_TABLE_PROVIDER.afterAll(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to shutdown external TestTableProvider: " + + EXTERNAL_TABLE_PROVIDER.getClass().getName(), + e); + } + } + } + List listManifestFiles() { return listManifestFiles(tableDir); } @@ -228,9 +247,60 @@ public static long countAllMetadataFiles(File tableDir) { } protected TestTables.TestTable create(Schema schema, PartitionSpec spec) { + if (EXTERNAL_TABLE_PROVIDER != null) { + return EXTERNAL_TABLE_PROVIDER.createTable(tableDir, "test", schema, spec, formatVersion); + } return TestTables.create(tableDir, "test", schema, spec, formatVersion); } + /** + * Loads table provider from external sources. + * Providers can be registered via: + * 1. System property: -Diceberg.test.table.provider=com.example.Provider + * 2. ServiceLoader: META-INF/services/org.apache.iceberg.TestTableProvider + */ + private static TestTableProvider initializeExternalTableProvider() { + TestTableProvider provider = discoverExternalTableProvider(); + if (provider != null) { + try { + provider.beforeAll(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize external TestTableProvider: " + + provider.getClass().getName(), + e); + } + } + return provider; + } + + private static TestTableProvider discoverExternalTableProvider() { + // Option 1: System property + String providerClass = System.getProperty("iceberg.test.table.provider"); + if (providerClass != null) { + try { + return (TestTableProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + System.err.println("Failed to load table provider from system property: " + e.getMessage()); + return null; + } + } + + // Option 2: ServiceLoader + try { + ServiceLoader loader = ServiceLoader.load(TestTableProvider.class); + Iterator iterator = loader.iterator(); + if (iterator.hasNext()) { + return iterator.next(); + } + } catch (Exception e) { + System.err.println("Failed to load table provider via ServiceLoader: " + e.getMessage()); + } + + return null; + } + TestTables.TestTable load() { return TestTables.load(tableDir, "test"); } diff --git a/core/src/test/java/org/apache/iceberg/TestTableProvider.java b/core/src/test/java/org/apache/iceberg/TestTableProvider.java new file mode 100644 index 0000000000..00273f4634 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestTableProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.File; + +/** + * SPI interface for external table providers to inject custom table creation + * into Iceberg's core tests. + * + *

Implementations can be registered via: + *

    + *
  • System property: -Diceberg.test.table.provider=com.example.Provider
  • + *
  • ServiceLoader: META-INF/services/org.apache.iceberg.TestTableProvider
  • + *
+ */ +public interface TestTableProvider { + + /** + * Creates a test table with custom operations. + * + * @param dir the directory for the table + * @param name the table name + * @param schema the table schema + * @param spec the partition spec + * @param formatVersion the format version + * @return a test table instance + */ + TestTables.TestTable createTable( + File dir, String name, Schema schema, PartitionSpec spec, int formatVersion); + + /** + * Called before any tests run. + * + * @throws Exception if setup fails + */ + default void beforeAll() throws Exception {} + + /** + * Called after all tests complete. + * + * @throws Exception if cleanup fails + */ + default void afterAll() throws Exception {} +} diff --git a/gradle.properties b/gradle.properties index ea857e7f27..e5848647d2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,5 +24,6 @@ systemProp.defaultSparkVersions=3.5 systemProp.knownSparkVersions=3.3,3.4,3.5 systemProp.defaultScalaVersion=2.12 systemProp.knownScalaVersions=2.12,2.13 +systemProp.openHouseCompatibilityCoordinate=com.linkedin.openhouse:tables-test-fixtures_2.12:0.0.1-SNAPSHOT:uber org.gradle.parallel=true org.gradle.jvmargs=-Xmx1024m diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index ef8bd7ee0a..3e3fc30230 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -273,6 +273,12 @@ private void initConf(HiveConf conf, int port) { } private static void setupMetastoreDB(String dbURL) throws SQLException, IOException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + } catch (ClassNotFoundException e) { + throw new SQLException("Unable to load Derby embedded driver", e); + } + Connection connection = DriverManager.getConnection(dbURL); ScriptRunner scriptRunner = new ScriptRunner(connection, true, true); diff --git a/spark/openhouse.gradle b/spark/openhouse.gradle new file mode 100644 index 0000000000..a21eb754b5 --- /dev/null +++ b/spark/openhouse.gradle @@ -0,0 +1,75 @@ +// OpenHouse compatibility test configuration for Spark modules + +configurations { + openhouseCompatibilityRuntime { + canBeConsumed = false + canBeResolved = true + } +} + +dependencies { + openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { + transitive = false + } + openhouseCompatibilityRuntime project(':iceberg-aws') + openhouseCompatibilityRuntime 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' + openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' + openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' +} + +tasks.register('openhouseCompatibilityTest', Test) { + useJUnitPlatform() + group = 'verification' + description = "Runs the OpenHouse compatibility suite for Spark ${sparkMajorVersion}" + maxHeapSize '2560m' + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime + + systemProperty "iceberg.test.spark.session.provider", + 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + systemProperty "iceberg.test.catalog.provider", + 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + systemProperty "iceberg.test.catalog.skip.defaults", "true" + systemProperty "spark.sql.sources.partitionOverwriteMode", "dynamic" + systemProperty "spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true" + systemProperty "spark.driver.bindAddress", "127.0.0.1" + + jvmArgs += [ + '--add-opens=java.base/java.lang=ALL-UNNAMED', + '--add-opens=java.base/java.lang.invoke=ALL-UNNAMED', + '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED', + '--add-opens=java.base/java.io=ALL-UNNAMED', + '--add-opens=java.base/java.net=ALL-UNNAMED', + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-opens=java.base/java.util=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED', + '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens=java.base/sun.nio.cs=ALL-UNNAMED', + '--add-opens=java.base/sun.security.action=ALL-UNNAMED', + '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', + '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' + ] + + filter { + includeTestsMatching "org.apache.iceberg.spark.sql.*" + includeTestsMatching "org.apache.iceberg.spark.actions.TestExpireSnapshotsAction" + includeTestsMatching "org.apache.iceberg.spark.source.TestSnapshotSelection" + } + + // Load exclusions from the fixtures jar + doFirst { + configurations.openhouseCompatibilityRuntime.resolve().each { file -> + if (file.name.contains("tables-test-fixtures")) { + zipTree(file).matching { include "openhouse-incompatible-tests.txt" }.each { f -> + f.eachLine { line -> + line = line.trim() + if (line && !line.startsWith('#')) { + filter.excludeTestsMatching line + } + } + } + } + } + } +} diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index b29ba6761e..a74d13846f 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -19,6 +19,37 @@ String sparkMajorVersion = '3.5' String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") +String openHouseFixturesCoordinate = gradle.startParameter.systemPropertiesArgs.get("openhouseFixturesCoordinate") +if (openHouseFixturesCoordinate != null) { + // The OpenHouse fixture is published as an executable Spring Boot jar by default. When consumers + // use it as a library we need the "-lib" (a.k.a. "uber") classifier that exposes the plain class + // files on the classpath. Automatically add the classifier when the caller only provides + // the typical groupId:artifactId:version coordinate. + int colonCount = openHouseFixturesCoordinate.count(":") + if (colonCount == 2) { + openHouseFixturesCoordinate = openHouseFixturesCoordinate + ":uber" + } +} + +def sparkSessionProviderValue = + gradle.startParameter.systemPropertiesArgs.get("iceberg.test.spark.session.provider") +def catalogProviderValue = + gradle.startParameter.systemPropertiesArgs.get("iceberg.test.catalog.provider") + +def skipDefaultCatalogsValue = + gradle.startParameter.systemPropertiesArgs.get("iceberg.test.catalog.skip.defaults") + +if (openHouseFixturesCoordinate != null) { + if (sparkSessionProviderValue == null || sparkSessionProviderValue.isEmpty()) { + sparkSessionProviderValue = 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + } + if (catalogProviderValue == null || catalogProviderValue.isEmpty()) { + catalogProviderValue = 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + } + if (skipDefaultCatalogsValue == null || skipDefaultCatalogsValue.isEmpty()) { + skipDefaultCatalogsValue = 'true' + } +} def sparkProjects = [ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}"), @@ -39,6 +70,9 @@ configure(sparkProjects) { } project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { + // Set property on the SUB-PROJECT so it is visible to the applied script + ext.sparkMajorVersion = sparkMajorVersion + apply plugin: 'scala' apply plugin: 'com.github.alisiikh.scalastyle' @@ -49,6 +83,14 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { } } + configurations { + openhouseCompatibilityRuntime { + canBeConsumed = false + canBeResolved = true + description = 'Dependencies required for OpenHouse compatibility verification' + } + } + dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') @@ -104,16 +146,60 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') testImplementation libs.sqlite.jdbc testImplementation libs.awaitility + + if (openHouseFixturesCoordinate != null) { + testRuntimeOnly(openHouseFixturesCoordinate) { + exclude group: 'org.apache.iceberg' + exclude group: 'com.linkedin.iceberg' + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + exclude group: 'org.apache.logging.log4j', module: 'log4j-to-slf4j' + exclude group: 'com.fasterxml.jackson.core' + exclude group: 'com.fasterxml.jackson.module' + exclude group: 'com.fasterxml.jackson.datatype' + exclude group: 'com.fasterxml.jackson.dataformat' + } + testRuntimeOnly project(':iceberg-aws') + testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' + } + + openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { + transitive = false + } + openhouseCompatibilityRuntime project(':iceberg-aws') + openhouseCompatibilityRuntime 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' + openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' + openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' + } + + if (openHouseFixturesCoordinate != null) { + configurations { + testRuntimeClasspath { + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + } + } } test { useJUnitPlatform() } - tasks.withType(Test) { + tasks.withType(Test).configureEach { // Vectorized reads need more memory maxHeapSize '2560m' + + if (sparkSessionProviderValue != null && !sparkSessionProviderValue.isEmpty()) { + systemProperty "iceberg.test.spark.session.provider", sparkSessionProviderValue + } + + if (catalogProviderValue != null && !catalogProviderValue.isEmpty()) { + systemProperty "iceberg.test.catalog.provider", catalogProviderValue + } + if (skipDefaultCatalogsValue != null && !skipDefaultCatalogsValue.isEmpty()) { + systemProperty "iceberg.test.catalog.skip.defaults", skipDefaultCatalogsValue + } } + + apply from: "$projectDir/../../openhouse.gradle" } project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") { @@ -214,6 +300,12 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio exclude group: 'org.scala-lang' exclude group: 'org.scala-lang.modules' } + + integrationImplementation { + // Log4j routing is handled via slf4j-simple/log4j-to-slf4j. Drop Spark's slf4j2 bridge to + // avoid Log4j enforcing mutual exclusivity at runtime. + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + } } dependencies { @@ -246,6 +338,15 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + if (openHouseFixturesCoordinate != null) { + integrationImplementation(openHouseFixturesCoordinate) { + // The fixtures already include Iceberg bits transitively; drop them to avoid conflicts + exclude group: 'org.apache.iceberg' + exclude group: 'com.linkedin.iceberg' + // Avoid conflicting SLF4J bridges; Iceberg already routes logging through log4j-to-slf4j + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + } + } // Not allowed on our classpath, only the runtime jar is allowed integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") @@ -292,7 +393,9 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio useJUnitPlatform() description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}" group = "verification" + if (project.hasProperty('extraJvmArgs')) { jvmArgs += project.property('extraJvmArgs') + } testClassesDirs = sourceSets.integration.output.classesDirs classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path) inputs.file(shadowJar.archiveFile.get().asFile.path) @@ -300,8 +403,20 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationTest.dependsOn shadowJar check.dependsOn integrationTest + tasks.withType(Test).configureEach { + if (sparkSessionProviderValue != null && !sparkSessionProviderValue.isEmpty()) { + systemProperty "iceberg.test.spark.session.provider", sparkSessionProviderValue + } + + if (catalogProviderValue != null && !catalogProviderValue.isEmpty()) { + systemProperty "iceberg.test.catalog.provider", catalogProviderValue + } + if (skipDefaultCatalogsValue != null && !skipDefaultCatalogsValue.isEmpty()) { + systemProperty "iceberg.test.catalog.skip.defaults", skipDefaultCatalogsValue + } + } + jar { enabled = false } } - diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java index 8e167b7f73..a52c322fb8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java @@ -42,7 +42,7 @@ public static void startMetastoreAndSpark() { metastore.start(); TestBase.hiveConf = metastore.hiveConf(); - TestBase.spark = + SparkSession.Builder builder = SparkSession.builder() .master("local[2]") .config("spark.testing", "true") @@ -54,8 +54,9 @@ public static void startMetastoreAndSpark() { .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") .config( SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) - .enableHiveSupport() - .getOrCreate(); + .enableHiveSupport(); + + TestBase.spark = buildSparkSession(builder, hiveConf); TestBase.catalog = (HiveCatalog) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java index 6203bf89bf..6be9b6b771 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java @@ -19,35 +19,59 @@ package org.apache.iceberg.spark; import java.nio.file.Path; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; -import org.junit.jupiter.api.extension.ExtendWith; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.junit.jupiter.api.io.TempDir; -@ExtendWith(ParameterizedTestExtension.class) public abstract class CatalogTestBase extends TestBaseWithCatalog { + @TempDir protected Path temp; + // these parameters are broken out to avoid changes that need to modify lots of test suites - @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") - protected static Object[][] parameters() { - return new Object[][] { - { - SparkCatalogConfig.HIVE.catalogName(), - SparkCatalogConfig.HIVE.implementation(), - SparkCatalogConfig.HIVE.properties() - }, - { - SparkCatalogConfig.HADOOP.catalogName(), - SparkCatalogConfig.HADOOP.implementation(), - SparkCatalogConfig.HADOOP.properties() - }, - { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties() + protected static Object[][] baseCatalogParameters() { + List params = new ArrayList<>(); + + // Check if we should skip default catalogs + boolean skipDefaults = Boolean.getBoolean("iceberg.test.catalog.skip.defaults"); + + // Load external catalog provider if specified + String providerClassName = System.getProperty("iceberg.test.catalog.provider"); + if (providerClassName != null) { + try { + TestCatalogProvider provider = + (TestCatalogProvider) + Class.forName(providerClassName).getDeclaredConstructor().newInstance(); + // Must call beforeAll() to initialize the provider (e.g., start servers) + provider.beforeAll(); + params.addAll(Arrays.asList(provider.getCatalogConfigurations())); + } catch (Exception e) { + throw new RuntimeException("Failed to load catalog provider: " + providerClassName, e); } - }; - } + } - @TempDir protected Path temp; + // Add default catalogs unless skipped + if (!skipDefaults) { + params.add( + new Object[] { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + }); + params.add( + new Object[] { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + }); + params.add( + new Object[] { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + }); + } + + return params.toArray(new Object[0][]); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index e7d5a0f039..e7a2762c4c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -27,11 +27,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TimeZone; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.ServiceLoader; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ContentFile; @@ -59,11 +61,15 @@ public abstract class TestBase extends SparkTestHelperBase { + private static final String SPARK_SESSION_PROVIDER_PROPERTY = + "iceberg.test.spark.session.provider"; + protected static TestHiveMetastore metastore = null; protected static HiveConf hiveConf = null; protected static SparkSession spark = null; protected static JavaSparkContext sparkContext = null; protected static HiveCatalog catalog = null; + private static TestSparkSessionProvider sparkSessionProvider = null; @BeforeAll public static void startMetastoreAndSpark() { @@ -71,14 +77,8 @@ public static void startMetastoreAndSpark() { metastore.start(); TestBase.hiveConf = metastore.hiveConf(); - TestBase.spark = - SparkSession.builder() - .master("local[2]") - .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") - .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) - .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") - .enableHiveSupport() - .getOrCreate(); + SparkSession.Builder defaultBuilder = createDefaultSparkBuilder(hiveConf); + TestBase.spark = buildSparkSession(defaultBuilder, hiveConf); TestBase.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -106,6 +106,71 @@ public static void stopMetastoreAndSpark() throws Exception { TestBase.spark = null; TestBase.sparkContext = null; } + if (sparkSessionProvider != null) { + try { + sparkSessionProvider.afterAll(); + } catch (Exception e) { + throw new RuntimeException("Failed to shutdown custom Spark session provider", e); + } finally { + sparkSessionProvider = null; + } + } + } + + protected static SparkSession.Builder createDefaultSparkBuilder(HiveConf hiveConf) { + return SparkSession.builder() + .master("local[2]") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .enableHiveSupport(); + } + + protected static SparkSession buildSparkSession( + SparkSession.Builder builder, HiveConf hiveConf) { + sparkSessionProvider = loadSparkSessionProvider(); + if (sparkSessionProvider != null) { + try { + sparkSessionProvider.beforeAll(); + SparkSession provided = sparkSessionProvider.createSparkSession(builder, hiveConf); + return provided != null ? provided : builder.getOrCreate(); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize custom Spark session provider", e); + } + } else { + return builder.getOrCreate(); + } + } + + private static TestSparkSessionProvider loadSparkSessionProvider() { + String providerClass = System.getProperty(SPARK_SESSION_PROVIDER_PROPERTY); + if (providerClass != null && !providerClass.isEmpty()) { + return instantiateSparkSessionProvider(providerClass); + } + + try { + ServiceLoader loader = + ServiceLoader.load(TestSparkSessionProvider.class); + Iterator iterator = loader.iterator(); + if (iterator.hasNext()) { + return iterator.next(); + } + } catch (Exception e) { + System.err.println( + "Failed to initialize ServiceLoader for TestSparkSessionProvider: " + e.getMessage()); + } + + return null; + } + + private static TestSparkSessionProvider instantiateSparkSessionProvider(String className) { + try { + return (TestSparkSessionProvider) + Class.forName(className).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to instantiate TestSparkSessionProvider: " + className, e); + } } protected long waitUntilAfter(long timestampMillis) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index c3c958abf0..e178466b68 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -22,10 +22,17 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -37,6 +44,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.internal.SQLConf; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -49,6 +57,15 @@ public abstract class TestBaseWithCatalog extends TestBase { @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") protected static Object[][] parameters() { + List params = new ArrayList<>(); + if (!Boolean.getBoolean("iceberg.test.catalog.skip.defaults")) { + params.addAll(Arrays.asList(baseCatalogParameters())); + } + params.addAll(Arrays.asList(loadExternalCatalogs())); + return params.toArray(new Object[0][]); + } + + protected static Object[][] baseCatalogParameters() { return new Object[][] { { SparkCatalogConfig.HADOOP.catalogName(), @@ -58,6 +75,44 @@ protected static Object[][] parameters() { }; } + private static Object[][] loadExternalCatalogs() { + List externalCatalogs = new ArrayList<>(); + + // Option 1: System property + String providerClass = System.getProperty("iceberg.test.catalog.provider"); + if (providerClass != null) { + try { + TestCatalogProvider provider = + (TestCatalogProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + provider.beforeAll(); + externalCatalogs.addAll(Arrays.asList(provider.getCatalogConfigurations())); + } catch (Exception e) { + System.err.println( + "Failed to load catalog provider from system property: " + e.getMessage()); + } + } + + // Option 2: ServiceLoader + try { + ServiceLoader loader = ServiceLoader.load(TestCatalogProvider.class); + Iterator iterator = loader.iterator(); + while (iterator.hasNext()) { + try { + TestCatalogProvider provider = iterator.next(); + provider.beforeAll(); + externalCatalogs.addAll(Arrays.asList(provider.getCatalogConfigurations())); + } catch (Exception e) { + System.err.println("Failed to load catalog provider via ServiceLoader: " + e.getMessage()); + } + } + } catch (Exception e) { + System.err.println("Failed to initialize ServiceLoader for TestCatalogProvider: " + e.getMessage()); + } + + return externalCatalogs.toArray(new Object[0][]); + } + @BeforeAll public static void createWarehouse() throws IOException { TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null); @@ -91,15 +146,21 @@ public static void dropWarehouse() throws IOException { @BeforeEach public void before() { - this.validationCatalog = - catalogName.equals("testhadoop") - ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) - : catalog; + Catalog configuredValidationCatalog = + loadConfiguredValidationCatalog(catalogName, catalogConfig); + if (configuredValidationCatalog != null) { + this.validationCatalog = configuredValidationCatalog; + } else { + this.validationCatalog = + catalogName.equals("testhadoop") + ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) + : catalog; + } this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; - spark.conf().set("spark.sql.catalog." + catalogName, implementation); + setCatalogConf("spark.sql.catalog." + catalogName, implementation); catalogConfig.forEach( - (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + (key, value) -> setCatalogConf("spark.sql.catalog." + catalogName + "." + key, value)); if ("hadoop".equalsIgnoreCase(catalogConfig.get("type"))) { spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); @@ -141,4 +202,24 @@ protected void configurePlanningMode(String table, PlanningMode planningMode) { TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()); } + + private Catalog loadConfiguredValidationCatalog(String name, Map configuration) { + String catalogImpl = configuration.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl == null || catalogImpl.isEmpty()) { + return null; + } + + Map configCopy = new HashMap<>(configuration); + return CatalogUtil.loadCatalog( + catalogImpl, name + "-validation", configCopy, spark.sessionState().newHadoopConf()); + } + + private void setCatalogConf(String key, String value) { + spark.conf().set(key, value); + try { + SQLConf.get().setConfString(key, value); + } catch (IllegalArgumentException e) { + // Some keys may not be valid SQLConf keys, which is fine + } + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java new file mode 100644 index 0000000000..27e480ac23 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +/** + * SPI interface for external catalog providers to inject catalog configurations + * into Iceberg's parameterized tests. + * + *

Implementations can be registered via: + *

    + *
  • System property: -Diceberg.test.catalog.provider=com.example.Provider
  • + *
  • ServiceLoader: META-INF/services/org.apache.iceberg.spark.TestCatalogProvider
  • + *
+ */ +public interface TestCatalogProvider { + + /** + * Returns catalog configurations to test against. + * Each array contains: [catalogName, implementation, configMap] + * + * @return array of catalog configurations + */ + Object[][] getCatalogConfigurations(); + + /** + * Called before any tests run. Use for setup (e.g., starting servers). + * + * @throws Exception if setup fails + */ + default void beforeAll() throws Exception {} + + /** + * Called after all tests complete. Use for cleanup (e.g., stopping servers). + * + * @throws Exception if cleanup fails + */ + default void afterAll() throws Exception {} +} + diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java new file mode 100644 index 0000000000..66c7b768af --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.spark.sql.SparkSession; + +/** + * Service Provider Interface that allows downstream integrations (e.g. catalog servers) + * to supply custom {@link SparkSession}s for Iceberg's Spark-based test suites. + * + *

Implementations can be wired using either the {@code iceberg.test.spark.session.provider} + * system property or via {@link java.util.ServiceLoader} registration at + * {@code META-INF/services/org.apache.iceberg.spark.TestSparkSessionProvider}. + * + *

The lifecycle methods mirror JUnit constructs so providers can eagerly start/stop the + * infrastructure they need (embedded services, custom Spark extensions, etc.). + */ +public interface TestSparkSessionProvider { + + /** + * Gives implementations a chance to perform any global setup before a Spark session is created. + */ + default void beforeAll() throws Exception {} + + /** + * Implementations must return the {@link SparkSession} Iceberg tests should use. + * + *

The method receives the default builder Iceberg would normally use along with the HiveConf + * backing the embedded metastore. Implementations can either mutate & invoke the builder or + * ignore it entirely and construct their own session. + * + * @param defaultBuilder pre-configured builder that points at Iceberg's embedded HMS + * @param hiveConf Hive configuration backing the test metastore + * @return a SparkSession ready for running Iceberg's tests + */ + SparkSession createSparkSession(SparkSession.Builder defaultBuilder, HiveConf hiveConf) + throws Exception; + + /** + * Gives implementations a chance to clean up resources after the test Spark session stops. + */ + default void afterAll() throws Exception {} +} + + From 76c838c6e6c86b4cfda290e842d7eaa7c4053a78 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 26 Nov 2025 01:10:27 -0800 Subject: [PATCH 2/7] chore: cleanup Gradle and TestBase code - Remove legacy openHouseFixturesCoordinate approach with exclusions - Use openhouseCompatibilityRuntime with transitive=false (cleaner) - Remove hardcoded SNAPSHOT coordinate from gradle.properties - Simplify TestBase.loadSparkSessionProvider using ServiceLoader.findFirst() - Remove redundant system property handling from spark/v3.5/build.gradle --- gradle.properties | 1 - spark/v3.5/build.gradle | 97 +------------------ .../org/apache/iceberg/spark/TestBase.java | 59 ++++------- 3 files changed, 24 insertions(+), 133 deletions(-) diff --git a/gradle.properties b/gradle.properties index e5848647d2..ea857e7f27 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,6 +24,5 @@ systemProp.defaultSparkVersions=3.5 systemProp.knownSparkVersions=3.3,3.4,3.5 systemProp.defaultScalaVersion=2.12 systemProp.knownScalaVersions=2.12,2.13 -systemProp.openHouseCompatibilityCoordinate=com.linkedin.openhouse:tables-test-fixtures_2.12:0.0.1-SNAPSHOT:uber org.gradle.parallel=true org.gradle.jvmargs=-Xmx1024m diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index a74d13846f..fe4c26772f 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -19,37 +19,6 @@ String sparkMajorVersion = '3.5' String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") -String openHouseFixturesCoordinate = gradle.startParameter.systemPropertiesArgs.get("openhouseFixturesCoordinate") -if (openHouseFixturesCoordinate != null) { - // The OpenHouse fixture is published as an executable Spring Boot jar by default. When consumers - // use it as a library we need the "-lib" (a.k.a. "uber") classifier that exposes the plain class - // files on the classpath. Automatically add the classifier when the caller only provides - // the typical groupId:artifactId:version coordinate. - int colonCount = openHouseFixturesCoordinate.count(":") - if (colonCount == 2) { - openHouseFixturesCoordinate = openHouseFixturesCoordinate + ":uber" - } -} - -def sparkSessionProviderValue = - gradle.startParameter.systemPropertiesArgs.get("iceberg.test.spark.session.provider") -def catalogProviderValue = - gradle.startParameter.systemPropertiesArgs.get("iceberg.test.catalog.provider") - -def skipDefaultCatalogsValue = - gradle.startParameter.systemPropertiesArgs.get("iceberg.test.catalog.skip.defaults") - -if (openHouseFixturesCoordinate != null) { - if (sparkSessionProviderValue == null || sparkSessionProviderValue.isEmpty()) { - sparkSessionProviderValue = 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' - } - if (catalogProviderValue == null || catalogProviderValue.isEmpty()) { - catalogProviderValue = 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' - } - if (skipDefaultCatalogsValue == null || skipDefaultCatalogsValue.isEmpty()) { - skipDefaultCatalogsValue = 'true' - } -} def sparkProjects = [ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}"), @@ -147,21 +116,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { testImplementation libs.sqlite.jdbc testImplementation libs.awaitility - if (openHouseFixturesCoordinate != null) { - testRuntimeOnly(openHouseFixturesCoordinate) { - exclude group: 'org.apache.iceberg' - exclude group: 'com.linkedin.iceberg' - exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' - exclude group: 'org.apache.logging.log4j', module: 'log4j-to-slf4j' - exclude group: 'com.fasterxml.jackson.core' - exclude group: 'com.fasterxml.jackson.module' - exclude group: 'com.fasterxml.jackson.datatype' - exclude group: 'com.fasterxml.jackson.dataformat' - } - testRuntimeOnly project(':iceberg-aws') - testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' - } - + // OpenHouse compatibility testing dependencies - uber jar with transitive=false + // avoids classpath conflicts since the jar is self-contained with shaded dependencies openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { transitive = false } @@ -171,14 +127,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' } - if (openHouseFixturesCoordinate != null) { - configurations { - testRuntimeClasspath { - exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' - } - } - } - test { useJUnitPlatform() } @@ -186,17 +134,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { tasks.withType(Test).configureEach { // Vectorized reads need more memory maxHeapSize '2560m' - - if (sparkSessionProviderValue != null && !sparkSessionProviderValue.isEmpty()) { - systemProperty "iceberg.test.spark.session.provider", sparkSessionProviderValue - } - - if (catalogProviderValue != null && !catalogProviderValue.isEmpty()) { - systemProperty "iceberg.test.catalog.provider", catalogProviderValue - } - if (skipDefaultCatalogsValue != null && !skipDefaultCatalogsValue.isEmpty()) { - systemProperty "iceberg.test.catalog.skip.defaults", skipDefaultCatalogsValue - } } apply from: "$projectDir/../../openhouse.gradle" @@ -300,12 +237,6 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio exclude group: 'org.scala-lang' exclude group: 'org.scala-lang.modules' } - - integrationImplementation { - // Log4j routing is handled via slf4j-simple/log4j-to-slf4j. Drop Spark's slf4j2 bridge to - // avoid Log4j enforcing mutual exclusivity at runtime. - exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' - } } dependencies { @@ -338,15 +269,6 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') - if (openHouseFixturesCoordinate != null) { - integrationImplementation(openHouseFixturesCoordinate) { - // The fixtures already include Iceberg bits transitively; drop them to avoid conflicts - exclude group: 'org.apache.iceberg' - exclude group: 'com.linkedin.iceberg' - // Avoid conflicting SLF4J bridges; Iceberg already routes logging through log4j-to-slf4j - exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' - } - } // Not allowed on our classpath, only the runtime jar is allowed integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") @@ -394,7 +316,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}" group = "verification" if (project.hasProperty('extraJvmArgs')) { - jvmArgs += project.property('extraJvmArgs') + jvmArgs += project.property('extraJvmArgs') } testClassesDirs = sourceSets.integration.output.classesDirs classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path) @@ -403,19 +325,6 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationTest.dependsOn shadowJar check.dependsOn integrationTest - tasks.withType(Test).configureEach { - if (sparkSessionProviderValue != null && !sparkSessionProviderValue.isEmpty()) { - systemProperty "iceberg.test.spark.session.provider", sparkSessionProviderValue - } - - if (catalogProviderValue != null && !catalogProviderValue.isEmpty()) { - systemProperty "iceberg.test.catalog.provider", catalogProviderValue - } - if (skipDefaultCatalogsValue != null && !skipDefaultCatalogsValue.isEmpty()) { - systemProperty "iceberg.test.catalog.skip.defaults", skipDefaultCatalogsValue - } - } - jar { enabled = false } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index e7a2762c4c..4d1f3f8d95 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -27,13 +27,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.TimeZone; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.ServiceLoader; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ContentFile; @@ -117,60 +116,44 @@ public static void stopMetastoreAndSpark() throws Exception { } } - protected static SparkSession.Builder createDefaultSparkBuilder(HiveConf hiveConf) { + protected static SparkSession.Builder createDefaultSparkBuilder(HiveConf conf) { return SparkSession.builder() .master("local[2]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") - .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.hadoop." + METASTOREURIS.varname, conf.get(METASTOREURIS.varname)) .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") .enableHiveSupport(); } - protected static SparkSession buildSparkSession( - SparkSession.Builder builder, HiveConf hiveConf) { + protected static SparkSession buildSparkSession(SparkSession.Builder builder, HiveConf conf) { sparkSessionProvider = loadSparkSessionProvider(); - if (sparkSessionProvider != null) { - try { - sparkSessionProvider.beforeAll(); - SparkSession provided = sparkSessionProvider.createSparkSession(builder, hiveConf); - return provided != null ? provided : builder.getOrCreate(); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize custom Spark session provider", e); - } - } else { + if (sparkSessionProvider == null) { return builder.getOrCreate(); } + + try { + sparkSessionProvider.beforeAll(); + SparkSession session = sparkSessionProvider.createSparkSession(builder, conf); + return session != null ? session : builder.getOrCreate(); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize custom Spark session provider", e); + } } private static TestSparkSessionProvider loadSparkSessionProvider() { + // Check system property first for explicit override String providerClass = System.getProperty(SPARK_SESSION_PROVIDER_PROPERTY); if (providerClass != null && !providerClass.isEmpty()) { - return instantiateSparkSessionProvider(providerClass); - } - - try { - ServiceLoader loader = - ServiceLoader.load(TestSparkSessionProvider.class); - Iterator iterator = loader.iterator(); - if (iterator.hasNext()) { - return iterator.next(); + try { + return (TestSparkSessionProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate " + providerClass, e); } - } catch (Exception e) { - System.err.println( - "Failed to initialize ServiceLoader for TestSparkSessionProvider: " + e.getMessage()); } - return null; - } - - private static TestSparkSessionProvider instantiateSparkSessionProvider(String className) { - try { - return (TestSparkSessionProvider) - Class.forName(className).getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new RuntimeException( - "Failed to instantiate TestSparkSessionProvider: " + className, e); - } + // Fall back to SPI discovery + return ServiceLoader.load(TestSparkSessionProvider.class).findFirst().orElse(null); } protected long waitUntilAfter(long timestampMillis) { From ee7c60de076e63f8f392e432fd1853cafe87fc72 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 26 Nov 2025 01:21:42 -0800 Subject: [PATCH 3/7] chore: refactor test infrastructure and remove duplication - TestBaseWithCatalog: centralize SPI loading with loadExternalCatalogProviders() - CatalogTestBase: just override defaultCatalogParameters(), inherit SPI logic - spark/v3.5/build.gradle: remove duplicate openhouseCompatibilityRuntime config - spark/openhouse.gradle: single source of truth for OpenHouse test config - Use consistent single quotes and add descriptive comments --- spark/openhouse.gradle | 38 +++-- spark/v3.5/build.gradle | 18 --- .../apache/iceberg/spark/CatalogTestBase.java | 68 +++------ .../iceberg/spark/TestBaseWithCatalog.java | 135 +++++++++--------- 4 files changed, 110 insertions(+), 149 deletions(-) diff --git a/spark/openhouse.gradle b/spark/openhouse.gradle index a21eb754b5..c2e22021e2 100644 --- a/spark/openhouse.gradle +++ b/spark/openhouse.gradle @@ -1,13 +1,16 @@ // OpenHouse compatibility test configuration for Spark modules +// Applied to spark modules that need OpenHouse integration testing configurations { openhouseCompatibilityRuntime { canBeConsumed = false canBeResolved = true + description = 'Runtime dependencies for OpenHouse compatibility tests' } } dependencies { + // Uber jar with transitive=false to avoid classpath conflicts openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { transitive = false } @@ -18,22 +21,28 @@ dependencies { } tasks.register('openhouseCompatibilityTest', Test) { - useJUnitPlatform() group = 'verification' - description = "Runs the OpenHouse compatibility suite for Spark ${sparkMajorVersion}" + description = "Run Iceberg tests against OpenHouse catalog (Spark ${sparkMajorVersion})" + + useJUnitPlatform() maxHeapSize '2560m' + testClassesDirs = sourceSets.test.output.classesDirs classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime - systemProperty "iceberg.test.spark.session.provider", + // Configure SPI providers + systemProperty 'iceberg.test.spark.session.provider', 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' - systemProperty "iceberg.test.catalog.provider", + systemProperty 'iceberg.test.catalog.provider', 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' - systemProperty "iceberg.test.catalog.skip.defaults", "true" - systemProperty "spark.sql.sources.partitionOverwriteMode", "dynamic" - systemProperty "spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true" - systemProperty "spark.driver.bindAddress", "127.0.0.1" + systemProperty 'iceberg.test.catalog.skip.defaults', 'true' + + // Spark configuration + systemProperty 'spark.sql.sources.partitionOverwriteMode', 'dynamic' + systemProperty 'spark.sql.legacy.respectNullabilityInTextDatasetConversion', 'true' + systemProperty 'spark.driver.bindAddress', '127.0.0.1' + // JVM args for Java 17+ compatibility jvmArgs += [ '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.lang.invoke=ALL-UNNAMED', @@ -51,17 +60,18 @@ tasks.register('openhouseCompatibilityTest', Test) { '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' ] + // Test filter - include SQL tests and specific action/source tests filter { - includeTestsMatching "org.apache.iceberg.spark.sql.*" - includeTestsMatching "org.apache.iceberg.spark.actions.TestExpireSnapshotsAction" - includeTestsMatching "org.apache.iceberg.spark.source.TestSnapshotSelection" + includeTestsMatching 'org.apache.iceberg.spark.sql.*' + includeTestsMatching 'org.apache.iceberg.spark.actions.TestExpireSnapshotsAction' + includeTestsMatching 'org.apache.iceberg.spark.source.TestSnapshotSelection' } - // Load exclusions from the fixtures jar + // Load exclusions from the fixtures jar at execution time doFirst { configurations.openhouseCompatibilityRuntime.resolve().each { file -> - if (file.name.contains("tables-test-fixtures")) { - zipTree(file).matching { include "openhouse-incompatible-tests.txt" }.each { f -> + if (file.name.contains('tables-test-fixtures')) { + zipTree(file).matching { include 'openhouse-incompatible-tests.txt' }.each { f -> f.eachLine { line -> line = line.trim() if (line && !line.startsWith('#')) { diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index fe4c26772f..95c21b2efd 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -52,14 +52,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { } } - configurations { - openhouseCompatibilityRuntime { - canBeConsumed = false - canBeResolved = true - description = 'Dependencies required for OpenHouse compatibility verification' - } - } - dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') @@ -115,16 +107,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') testImplementation libs.sqlite.jdbc testImplementation libs.awaitility - - // OpenHouse compatibility testing dependencies - uber jar with transitive=false - // avoids classpath conflicts since the jar is self-contained with shaded dependencies - openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { - transitive = false - } - openhouseCompatibilityRuntime project(':iceberg-aws') - openhouseCompatibilityRuntime 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' - openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' - openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' } test { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java index 6be9b6b771..5695943eab 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java @@ -19,59 +19,33 @@ package org.apache.iceberg.spark; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.junit.jupiter.api.io.TempDir; public abstract class CatalogTestBase extends TestBaseWithCatalog { @TempDir protected Path temp; - // these parameters are broken out to avoid changes that need to modify lots of test suites - protected static Object[][] baseCatalogParameters() { - List params = new ArrayList<>(); - - // Check if we should skip default catalogs - boolean skipDefaults = Boolean.getBoolean("iceberg.test.catalog.skip.defaults"); - - // Load external catalog provider if specified - String providerClassName = System.getProperty("iceberg.test.catalog.provider"); - if (providerClassName != null) { - try { - TestCatalogProvider provider = - (TestCatalogProvider) - Class.forName(providerClassName).getDeclaredConstructor().newInstance(); - // Must call beforeAll() to initialize the provider (e.g., start servers) - provider.beforeAll(); - params.addAll(Arrays.asList(provider.getCatalogConfigurations())); - } catch (Exception e) { - throw new RuntimeException("Failed to load catalog provider: " + providerClassName, e); + /** + * Override to include HIVE and SPARK catalogs in addition to HADOOP. + * External catalog providers are loaded by the parent class. + */ + protected static Object[][] defaultCatalogParameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() } - } - - // Add default catalogs unless skipped - if (!skipDefaults) { - params.add( - new Object[] { - SparkCatalogConfig.HIVE.catalogName(), - SparkCatalogConfig.HIVE.implementation(), - SparkCatalogConfig.HIVE.properties() - }); - params.add( - new Object[] { - SparkCatalogConfig.HADOOP.catalogName(), - SparkCatalogConfig.HADOOP.implementation(), - SparkCatalogConfig.HADOOP.properties() - }); - params.add( - new Object[] { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties() - }); - } - - return params.toArray(new Object[0][]); + }; } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index e178466b68..71c94f40ca 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -53,19 +52,33 @@ @ExtendWith(ParameterizedTestExtension.class) public abstract class TestBaseWithCatalog extends TestBase { + + private static final String CATALOG_PROVIDER_PROPERTY = "iceberg.test.catalog.provider"; + private static final String SKIP_DEFAULTS_PROPERTY = "iceberg.test.catalog.skip.defaults"; + protected static File warehouse = null; @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") protected static Object[][] parameters() { List params = new ArrayList<>(); - if (!Boolean.getBoolean("iceberg.test.catalog.skip.defaults")) { - params.addAll(Arrays.asList(baseCatalogParameters())); + + if (!Boolean.getBoolean(SKIP_DEFAULTS_PROPERTY)) { + params.addAll(Arrays.asList(defaultCatalogParameters())); } - params.addAll(Arrays.asList(loadExternalCatalogs())); + + loadExternalCatalogProviders().forEach(provider -> { + try { + provider.beforeAll(); + params.addAll(Arrays.asList(provider.getCatalogConfigurations())); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize catalog provider: " + provider, e); + } + }); + return params.toArray(new Object[0][]); } - protected static Object[][] baseCatalogParameters() { + protected static Object[][] defaultCatalogParameters() { return new Object[][] { { SparkCatalogConfig.HADOOP.catalogName(), @@ -75,42 +88,25 @@ protected static Object[][] baseCatalogParameters() { }; } - private static Object[][] loadExternalCatalogs() { - List externalCatalogs = new ArrayList<>(); + private static List loadExternalCatalogProviders() { + List providers = new ArrayList<>(); - // Option 1: System property - String providerClass = System.getProperty("iceberg.test.catalog.provider"); - if (providerClass != null) { + // System property takes precedence + String providerClass = System.getProperty(CATALOG_PROVIDER_PROPERTY); + if (providerClass != null && !providerClass.isEmpty()) { try { - TestCatalogProvider provider = + providers.add( (TestCatalogProvider) - Class.forName(providerClass).getDeclaredConstructor().newInstance(); - provider.beforeAll(); - externalCatalogs.addAll(Arrays.asList(provider.getCatalogConfigurations())); - } catch (Exception e) { - System.err.println( - "Failed to load catalog provider from system property: " + e.getMessage()); + Class.forName(providerClass).getDeclaredConstructor().newInstance()); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate " + providerClass, e); } } - // Option 2: ServiceLoader - try { - ServiceLoader loader = ServiceLoader.load(TestCatalogProvider.class); - Iterator iterator = loader.iterator(); - while (iterator.hasNext()) { - try { - TestCatalogProvider provider = iterator.next(); - provider.beforeAll(); - externalCatalogs.addAll(Arrays.asList(provider.getCatalogConfigurations())); - } catch (Exception e) { - System.err.println("Failed to load catalog provider via ServiceLoader: " + e.getMessage()); - } - } - } catch (Exception e) { - System.err.println("Failed to initialize ServiceLoader for TestCatalogProvider: " + e.getMessage()); - } + // SPI discovery + ServiceLoader.load(TestCatalogProvider.class).forEach(providers::add); - return externalCatalogs.toArray(new Object[0][]); + return providers; } @BeforeAll @@ -146,30 +142,49 @@ public static void dropWarehouse() throws IOException { @BeforeEach public void before() { - Catalog configuredValidationCatalog = - loadConfiguredValidationCatalog(catalogName, catalogConfig); - if (configuredValidationCatalog != null) { - this.validationCatalog = configuredValidationCatalog; - } else { - this.validationCatalog = - catalogName.equals("testhadoop") - ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) - : catalog; - } + this.validationCatalog = createValidationCatalog(); this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; - setCatalogConf("spark.sql.catalog." + catalogName, implementation); + configureCatalog(); + + this.tableName = + (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; + + sql("CREATE NAMESPACE IF NOT EXISTS default"); + } + + private Catalog createValidationCatalog() { + String catalogImpl = catalogConfig.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl != null && !catalogImpl.isEmpty()) { + return CatalogUtil.loadCatalog( + catalogImpl, + catalogName + "-validation", + new HashMap<>(catalogConfig), + spark.sessionState().newHadoopConf()); + } + + return catalogName.equals("testhadoop") + ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) + : catalog; + } + + private void configureCatalog() { + setSparkConf("spark.sql.catalog." + catalogName, implementation); catalogConfig.forEach( - (key, value) -> setCatalogConf("spark.sql.catalog." + catalogName + "." + key, value)); + (key, value) -> setSparkConf("spark.sql.catalog." + catalogName + "." + key, value)); if ("hadoop".equalsIgnoreCase(catalogConfig.get("type"))) { spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); } + } - this.tableName = - (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; - - sql("CREATE NAMESPACE IF NOT EXISTS default"); + private void setSparkConf(String key, String value) { + spark.conf().set(key, value); + try { + SQLConf.get().setConfString(key, value); + } catch (IllegalArgumentException ignored) { + // Some keys may not be valid SQLConf keys + } } protected String tableName(String name) { @@ -202,24 +217,4 @@ protected void configurePlanningMode(String table, PlanningMode planningMode) { TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()); } - - private Catalog loadConfiguredValidationCatalog(String name, Map configuration) { - String catalogImpl = configuration.get(CatalogProperties.CATALOG_IMPL); - if (catalogImpl == null || catalogImpl.isEmpty()) { - return null; - } - - Map configCopy = new HashMap<>(configuration); - return CatalogUtil.loadCatalog( - catalogImpl, name + "-validation", configCopy, spark.sessionState().newHadoopConf()); - } - - private void setCatalogConf(String key, String value) { - spark.conf().set(key, value); - try { - SQLConf.get().setConfString(key, value); - } catch (IllegalArgumentException e) { - // Some keys may not be valid SQLConf keys, which is fine - } - } } From c003bef690117c41bd932a655671b1f9bbfbdfa7 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 26 Nov 2025 01:23:31 -0800 Subject: [PATCH 4/7] chore: revert unnecessary Derby driver loading in TestHiveMetastore --- .../java/org/apache/iceberg/hive/TestHiveMetastore.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 3e3fc30230..ef8bd7ee0a 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -273,12 +273,6 @@ private void initConf(HiveConf conf, int port) { } private static void setupMetastoreDB(String dbURL) throws SQLException, IOException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - } catch (ClassNotFoundException e) { - throw new SQLException("Unable to load Derby embedded driver", e); - } - Connection connection = DriverManager.getConnection(dbURL); ScriptRunner scriptRunner = new ScriptRunner(connection, true, true); From a9f735114d376dc27519ef3570b792ab0c317872 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 26 Nov 2025 15:47:33 -0800 Subject: [PATCH 5/7] fix: core compatibility tests and build infrastructure - Converted iceberg submodule to real git repo to fix gradle-git-properties - Simplified TableTestBase to use reflection (no internal SPI dependency) - Fixed ServiceLoader usage in TestBase for Java 8 compatibility - Centralized OpenHouse config in spark/openhouse.gradle - Fixed Hadoop Configuration in OpenHouseTestTableProvider (in tables-test-fixtures) to prevent NPE --- .../iceberg/ParameterizedTestExtension.java | 27 +++--- build.gradle | 36 ++++---- .../org/apache/iceberg/TableTestBase.java | 84 +++++++++---------- .../org/apache/iceberg/TestTableProvider.java | 61 -------------- .../org/apache/iceberg/spark/TestBase.java | 5 +- 5 files changed, 71 insertions(+), 142 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/TestTableProvider.java diff --git a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java index 789ae6b2a1..a2897a6d73 100644 --- a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java +++ b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java @@ -125,33 +125,26 @@ public Stream provideTestTemplateInvocationContex parameterProvider)); } + /** + * Resolves the parameter provider method, preferring the most specific (child) class + * when multiple @Parameters methods exist in the hierarchy. + */ private static Method resolveParameterProvider( Class testClass, List parameterProviders) { if (parameterProviders.isEmpty()) { return null; } + if (parameterProviders.size() == 1) { + return parameterProviders.get(0); + } - Class current = testClass; - while (current != null) { - Method selected = null; + // Walk up the hierarchy, return the first match (most specific class) + for (Class current = testClass; current != null; current = current.getSuperclass()) { for (Method candidate : parameterProviders) { if (candidate.getDeclaringClass().equals(current)) { - if (selected != null) { - throw new IllegalStateException("Multiple parameter providers are found"); - } - selected = candidate; + return candidate; } } - - if (selected != null) { - return selected; - } - - current = current.getSuperclass(); - } - - if (parameterProviders.size() > 1) { - throw new IllegalStateException("Multiple parameter providers are found"); } return parameterProviders.get(0); diff --git a/build.gradle b/build.gradle index 495223cb0d..dd65ca964a 100644 --- a/build.gradle +++ b/build.gradle @@ -349,13 +349,13 @@ project(':iceberg-core') { openhouseCompatibilityRuntime { canBeConsumed = false canBeResolved = true - description = 'Dependencies required for OpenHouse compatibility verification' } } test { useJUnitPlatform() } + dependencies { api project(':iceberg-api') implementation project(':iceberg-common') @@ -400,26 +400,28 @@ project(':iceberg-core') { tasks.register('openhouseCompatibilityTest', Test) { useJUnitPlatform() group = 'verification' - description = 'Runs the OpenHouse compatibility subset for iceberg-core' + description = 'Runs OpenHouse compatibility tests for iceberg-core' testClassesDirs = sourceSets.test.output.classesDirs classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime - systemProperty "iceberg.test.table.provider", + + systemProperty 'iceberg.test.table.provider', 'org.apache.iceberg.openhouse.OpenHouseTestTableProvider' + filter { - includeTestsMatching "org.apache.iceberg.TestWapWorkflow" - includeTestsMatching "org.apache.iceberg.TestFastAppend" - includeTestsMatching "org.apache.iceberg.TestMergeAppend" - includeTestsMatching "org.apache.iceberg.TestOverwrite" - includeTestsMatching "org.apache.iceberg.TestReplacePartitions" - includeTestsMatching "org.apache.iceberg.TestRowDelta" - includeTestsMatching "org.apache.iceberg.TestDeleteFiles" - includeTestsMatching "org.apache.iceberg.TestTransaction" - includeTestsMatching "org.apache.iceberg.TestCreateTransaction" - includeTestsMatching "org.apache.iceberg.TestRemoveSnapshots" - includeTestsMatching "org.apache.iceberg.TestSnapshot" - includeTestsMatching "org.apache.iceberg.TestSnapshotSelection" - includeTestsMatching "org.apache.iceberg.TestRewriteFiles" - includeTestsMatching "org.apache.iceberg.TestRewriteManifests" + includeTestsMatching 'org.apache.iceberg.TestWapWorkflow' + includeTestsMatching 'org.apache.iceberg.TestFastAppend' + includeTestsMatching 'org.apache.iceberg.TestMergeAppend' + includeTestsMatching 'org.apache.iceberg.TestOverwrite' + includeTestsMatching 'org.apache.iceberg.TestReplacePartitions' + includeTestsMatching 'org.apache.iceberg.TestRowDelta' + includeTestsMatching 'org.apache.iceberg.TestDeleteFiles' + includeTestsMatching 'org.apache.iceberg.TestTransaction' + includeTestsMatching 'org.apache.iceberg.TestCreateTransaction' + includeTestsMatching 'org.apache.iceberg.TestRemoveSnapshots' + includeTestsMatching 'org.apache.iceberg.TestSnapshot' + includeTestsMatching 'org.apache.iceberg.TestSnapshotSelection' + includeTestsMatching 'org.apache.iceberg.TestRewriteFiles' + includeTestsMatching 'org.apache.iceberg.TestRewriteManifests' } } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c4a5091176..0c75a2a6bc 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.ServiceLoader; import java.util.Set; import java.util.UUID; import org.apache.iceberg.deletes.PositionDelete; @@ -173,8 +172,9 @@ public class TableTestBase { protected File metadataDir = null; public TestTables.TestTable table = null; - private static final TestTableProvider EXTERNAL_TABLE_PROVIDER = - initializeExternalTableProvider(); + // External table provider loaded via reflection to avoid compile-time dependency + private static final Object EXTERNAL_TABLE_PROVIDER = initializeExternalTableProvider(); + private static final java.lang.reflect.Method CREATE_TABLE_METHOD = getCreateTableMethod(); protected final int formatVersion; @@ -208,10 +208,12 @@ public void cleanupTables() { public static void shutdownExternalTableProvider() { if (EXTERNAL_TABLE_PROVIDER != null) { try { - EXTERNAL_TABLE_PROVIDER.afterAll(); + java.lang.reflect.Method afterAll = + EXTERNAL_TABLE_PROVIDER.getClass().getMethod("afterAll"); + afterAll.invoke(EXTERNAL_TABLE_PROVIDER); } catch (Exception e) { throw new RuntimeException( - "Failed to shutdown external TestTableProvider: " + "Failed to shutdown external table provider: " + EXTERNAL_TABLE_PROVIDER.getClass().getName(), e); } @@ -247,58 +249,48 @@ public static long countAllMetadataFiles(File tableDir) { } protected TestTables.TestTable create(Schema schema, PartitionSpec spec) { - if (EXTERNAL_TABLE_PROVIDER != null) { - return EXTERNAL_TABLE_PROVIDER.createTable(tableDir, "test", schema, spec, formatVersion); - } - return TestTables.create(tableDir, "test", schema, spec, formatVersion); - } - - /** - * Loads table provider from external sources. - * Providers can be registered via: - * 1. System property: -Diceberg.test.table.provider=com.example.Provider - * 2. ServiceLoader: META-INF/services/org.apache.iceberg.TestTableProvider - */ - private static TestTableProvider initializeExternalTableProvider() { - TestTableProvider provider = discoverExternalTableProvider(); - if (provider != null) { + if (EXTERNAL_TABLE_PROVIDER != null && CREATE_TABLE_METHOD != null) { try { - provider.beforeAll(); + Object result = + CREATE_TABLE_METHOD.invoke( + EXTERNAL_TABLE_PROVIDER, tableDir, "test", schema, spec, formatVersion); + return (TestTables.TestTable) result; } catch (Exception e) { - throw new RuntimeException( - "Failed to initialize external TestTableProvider: " - + provider.getClass().getName(), - e); + throw new RuntimeException("Failed to create table via external provider", e); } } - return provider; + return TestTables.create(tableDir, "test", schema, spec, formatVersion); } - private static TestTableProvider discoverExternalTableProvider() { - // Option 1: System property + private static Object initializeExternalTableProvider() { String providerClass = System.getProperty("iceberg.test.table.provider"); - if (providerClass != null) { - try { - return (TestTableProvider) - Class.forName(providerClass).getDeclaredConstructor().newInstance(); - } catch (Exception e) { - System.err.println("Failed to load table provider from system property: " + e.getMessage()); - return null; - } + if (providerClass == null || providerClass.isEmpty()) { + return null; } - - // Option 2: ServiceLoader + try { - ServiceLoader loader = ServiceLoader.load(TestTableProvider.class); - Iterator iterator = loader.iterator(); - if (iterator.hasNext()) { - return iterator.next(); - } + Object provider = Class.forName(providerClass).getDeclaredConstructor().newInstance(); + // Call beforeAll() via reflection + java.lang.reflect.Method beforeAll = provider.getClass().getMethod("beforeAll"); + beforeAll.invoke(provider); + return provider; } catch (Exception e) { - System.err.println("Failed to load table provider via ServiceLoader: " + e.getMessage()); + throw new RuntimeException("Failed to initialize " + providerClass, e); + } + } + + private static java.lang.reflect.Method getCreateTableMethod() { + if (EXTERNAL_TABLE_PROVIDER == null) { + return null; + } + try { + return EXTERNAL_TABLE_PROVIDER + .getClass() + .getMethod("createTable", File.class, String.class, Schema.class, PartitionSpec.class, int.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + "External table provider must have createTable(File, String, Schema, PartitionSpec, int) method", e); } - - return null; } TestTables.TestTable load() { diff --git a/core/src/test/java/org/apache/iceberg/TestTableProvider.java b/core/src/test/java/org/apache/iceberg/TestTableProvider.java deleted file mode 100644 index 00273f4634..0000000000 --- a/core/src/test/java/org/apache/iceberg/TestTableProvider.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import java.io.File; - -/** - * SPI interface for external table providers to inject custom table creation - * into Iceberg's core tests. - * - *

Implementations can be registered via: - *

    - *
  • System property: -Diceberg.test.table.provider=com.example.Provider
  • - *
  • ServiceLoader: META-INF/services/org.apache.iceberg.TestTableProvider
  • - *
- */ -public interface TestTableProvider { - - /** - * Creates a test table with custom operations. - * - * @param dir the directory for the table - * @param name the table name - * @param schema the table schema - * @param spec the partition spec - * @param formatVersion the format version - * @return a test table instance - */ - TestTables.TestTable createTable( - File dir, String name, Schema schema, PartitionSpec spec, int formatVersion); - - /** - * Called before any tests run. - * - * @throws Exception if setup fails - */ - default void beforeAll() throws Exception {} - - /** - * Called after all tests complete. - * - * @throws Exception if cleanup fails - */ - default void afterAll() throws Exception {} -} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 4d1f3f8d95..46c3a1e6d5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -153,7 +154,9 @@ private static TestSparkSessionProvider loadSparkSessionProvider() { } // Fall back to SPI discovery - return ServiceLoader.load(TestSparkSessionProvider.class).findFirst().orElse(null); + Iterator iterator = + ServiceLoader.load(TestSparkSessionProvider.class).iterator(); + return iterator.hasNext() ? iterator.next() : null; } protected long waitUntilAfter(long timestampMillis) { From ffeb965daf53cc4ff774c3992579d2aa15ccd2a1 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Sun, 30 Nov 2025 13:53:34 -0800 Subject: [PATCH 6/7] working tests --- build.gradle | 33 +++++++++++++++++++ .../org/apache/iceberg/TableTestBase.java | 22 +++++++++---- spark/openhouse.gradle | 8 +++++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index dd65ca964a..8e2c2d4803 100644 --- a/build.gradle +++ b/build.gradle @@ -407,6 +407,33 @@ project(':iceberg-core') { systemProperty 'iceberg.test.table.provider', 'org.apache.iceberg.openhouse.OpenHouseTestTableProvider' + // Load exclusions from the fixtures jar at execution time + doFirst { + println "DEBUG: Resolving openhouseCompatibilityRuntime..." + configurations.openhouseCompatibilityRuntime.resolve().each { file -> + if (file.name.contains('tables-test-fixtures')) { + println "DEBUG: Found fixtures jar: ${file.name}" + zipTree(file).matching { include 'openhouse-incompatible-tests.txt' }.each { f -> + println "DEBUG: Found exclusions file: ${f}" + f.eachLine { line -> + line = line.trim() + if (line && !line.startsWith('#')) { + // Replace # with . for Gradle test filter pattern + line = line.replace('#', '.') + + // Append wildcard to handle parameterized tests + if (!line.endsWith('*')) { + line = line + '*' + } + println "DEBUG: Excluding ${line}" + filter.excludeTestsMatching line + } + } + } + } + } + } + filter { includeTestsMatching 'org.apache.iceberg.TestWapWorkflow' includeTestsMatching 'org.apache.iceberg.TestFastAppend' @@ -423,6 +450,12 @@ project(':iceberg-core') { includeTestsMatching 'org.apache.iceberg.TestRewriteFiles' includeTestsMatching 'org.apache.iceberg.TestRewriteManifests' } + + testLogging { + events "passed", "skipped", "failed", "standardOut", "standardError" + showStandardStreams = true + exceptionFormat = 'full' + } } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 0c75a2a6bc..bff0d73183 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -225,21 +225,29 @@ List listManifestFiles() { } List listManifestFiles(File tableDirToList) { - return Lists.newArrayList( + File[] files = new File(tableDirToList, "metadata") .listFiles( (dir, name) -> !name.startsWith("snap") - && Files.getFileExtension(name).equalsIgnoreCase("avro"))); + && Files.getFileExtension(name).equalsIgnoreCase("avro")); + if (files == null) { + return Lists.newArrayList(); + } + return Lists.newArrayList(files); } List listManifestLists(String tableDirToList) { - return Lists.newArrayList( + File[] files = new File(tableDirToList, "metadata") .listFiles( (dir, name) -> name.startsWith("snap") - && Files.getFileExtension(name).equalsIgnoreCase("avro"))); + && Files.getFileExtension(name).equalsIgnoreCase("avro")); + if (files == null) { + return Lists.newArrayList(); + } + return Lists.newArrayList(files); } public static long countAllMetadataFiles(File tableDir) { @@ -294,15 +302,15 @@ private static java.lang.reflect.Method getCreateTableMethod() { } TestTables.TestTable load() { - return TestTables.load(tableDir, "test"); + return TestTables.load(tableDir, table.name()); } Integer version() { - return TestTables.metadataVersion("test"); + return TestTables.metadataVersion(table.name()); } public TableMetadata readMetadata() { - return TestTables.readMetadata("test"); + return TestTables.readMetadata(table.name()); } ManifestFile writeManifest(DataFile... files) throws IOException { diff --git a/spark/openhouse.gradle b/spark/openhouse.gradle index c2e22021e2..81c7cceee4 100644 --- a/spark/openhouse.gradle +++ b/spark/openhouse.gradle @@ -75,6 +75,14 @@ tasks.register('openhouseCompatibilityTest', Test) { f.eachLine { line -> line = line.trim() if (line && !line.startsWith('#')) { + // Replace # with . for Gradle test filter pattern + line = line.replace('#', '.') + + // Append wildcard to handle parameterized tests + if (!line.endsWith('*')) { + line = line + '*' + } + println "DEBUG: Excluding ${line}" filter.excludeTestsMatching line } } From fbc181ec5f0c2dd7440d9fcd8ddb785d4c22b4c3 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Mon, 1 Dec 2025 19:56:53 -0800 Subject: [PATCH 7/7] working minimal tests --- .../iceberg/ParameterizedTestExtension.java | 44 ++++-- build.gradle | 32 ++-- spark/openhouse.gradle | 13 +- spark/v3.5/build.gradle | 2 + .../SparkRowLevelOperationsTestBase.java | 146 +++++++++++------- .../spark/extensions/TestBranchDDL.java | 1 + .../apache/iceberg/spark/SparkCatalog.java | 5 + .../iceberg/spark/SparkSessionCatalog.java | 2 +- .../iceberg/spark/SparkCatalogConfig.java | 31 ++++ .../iceberg/spark/TestBaseWithCatalog.java | 2 +- 10 files changed, 175 insertions(+), 103 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java index a2897a6d73..a65ee4dfa3 100644 --- a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java +++ b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java @@ -20,7 +20,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -95,18 +97,30 @@ public Stream provideTestTemplateInvocationContex Preconditions.checkState(parameterValues != null, "Parameter values cannot be null"); - // Parameter values could be Object[][] - if (parameterValues instanceof Object[][]) { - Object[][] typedParameterValues = (Object[][]) parameterValues; - return createContextForParameters( - Arrays.stream(typedParameterValues), testNameTemplate, context); + List allParameters = new ArrayList<>(); + normalizeParameters(parameterValues).forEach(allParameters::add); + + try { + Method extraMethod = context.getRequiredTestClass().getMethod("getExtraParameters"); + if (Modifier.isStatic(extraMethod.getModifiers())) { + Object extra = extraMethod.invoke(null); + normalizeParameters(extra).forEach(allParameters::add); + } + } catch (NoSuchMethodException e) { + // ignore + } catch (Exception e) { + throw new RuntimeException("Failed to invoke getExtraParameters", e); } - // or a Collection - if (parameterValues instanceof Collection) { - final Collection typedParameterValues = (Collection) parameterValues; - final Stream parameterValueStream = - typedParameterValues.stream() + return createContextForParameters(allParameters.stream(), testNameTemplate, context); + } + + private Stream normalizeParameters(Object parameters) { + if (parameters instanceof Object[][]) { + return Arrays.stream((Object[][]) parameters); + } else if (parameters instanceof Collection) { + return ((Collection) parameters) + .stream() .map( (Function) parameterValue -> { @@ -116,18 +130,16 @@ public Stream provideTestTemplateInvocationContex return new Object[] {parameterValue}; } }); - return createContextForParameters(parameterValueStream, testNameTemplate, context); } - throw new IllegalStateException( String.format( - "Return type of @Parameters annotated method \"%s\" should be either Object[][] or Collection", - parameterProvider)); + "Return type of @Parameters annotated method should be either Object[][] or Collection, but was %s", + parameters.getClass().getName())); } /** - * Resolves the parameter provider method, preferring the most specific (child) class - * when multiple @Parameters methods exist in the hierarchy. + * Resolves the parameter provider method, preferring the most specific (child) class when multiple + * @Parameters methods exist in the hierarchy. */ private static Method resolveParameterProvider( Class testClass, List parameterProviders) { diff --git a/build.gradle b/build.gradle index 8e2c2d4803..5c8c6d871c 100644 --- a/build.gradle +++ b/build.gradle @@ -407,14 +407,19 @@ project(':iceberg-core') { systemProperty 'iceberg.test.table.provider', 'org.apache.iceberg.openhouse.OpenHouseTestTableProvider' - // Load exclusions from the fixtures jar at execution time + // Test filter - initially exclude everything, then include from list + filter { + failOnNoMatchingTests = false + } + + // Load inclusions from the fixtures jar at execution time doFirst { println "DEBUG: Resolving openhouseCompatibilityRuntime..." configurations.openhouseCompatibilityRuntime.resolve().each { file -> if (file.name.contains('tables-test-fixtures')) { println "DEBUG: Found fixtures jar: ${file.name}" - zipTree(file).matching { include 'openhouse-incompatible-tests.txt' }.each { f -> - println "DEBUG: Found exclusions file: ${f}" + zipTree(file).matching { include 'openhouse-iceberg-compatibility-tests.txt' }.each { f -> + println "DEBUG: Found inclusions file: ${f}" f.eachLine { line -> line = line.trim() if (line && !line.startsWith('#')) { @@ -425,8 +430,8 @@ project(':iceberg-core') { if (!line.endsWith('*')) { line = line + '*' } - println "DEBUG: Excluding ${line}" - filter.excludeTestsMatching line + println "DEBUG: Including ${line}" + filter.includeTestsMatching line } } } @@ -434,22 +439,6 @@ project(':iceberg-core') { } } - filter { - includeTestsMatching 'org.apache.iceberg.TestWapWorkflow' - includeTestsMatching 'org.apache.iceberg.TestFastAppend' - includeTestsMatching 'org.apache.iceberg.TestMergeAppend' - includeTestsMatching 'org.apache.iceberg.TestOverwrite' - includeTestsMatching 'org.apache.iceberg.TestReplacePartitions' - includeTestsMatching 'org.apache.iceberg.TestRowDelta' - includeTestsMatching 'org.apache.iceberg.TestDeleteFiles' - includeTestsMatching 'org.apache.iceberg.TestTransaction' - includeTestsMatching 'org.apache.iceberg.TestCreateTransaction' - includeTestsMatching 'org.apache.iceberg.TestRemoveSnapshots' - includeTestsMatching 'org.apache.iceberg.TestSnapshot' - includeTestsMatching 'org.apache.iceberg.TestSnapshotSelection' - includeTestsMatching 'org.apache.iceberg.TestRewriteFiles' - includeTestsMatching 'org.apache.iceberg.TestRewriteManifests' - } testLogging { events "passed", "skipped", "failed", "standardOut", "standardError" @@ -1121,6 +1110,7 @@ tasks.register('openhouseCompatibility') { description = "Runs all OpenHouse compatibility tests" dependsOn project(':iceberg-core').tasks.openhouseCompatibilityTest dependsOn project(":iceberg-spark:iceberg-spark-${openHouseCompatibilitySparkMajorVersion}_${scalaVersion}").tasks.openhouseCompatibilityTest + dependsOn project(":iceberg-spark:iceberg-spark-extensions-${openHouseCompatibilitySparkMajorVersion}_${scalaVersion}").tasks.openhouseCompatibilityTest } project(':iceberg-bom') { diff --git a/spark/openhouse.gradle b/spark/openhouse.gradle index 81c7cceee4..2c27531a45 100644 --- a/spark/openhouse.gradle +++ b/spark/openhouse.gradle @@ -60,18 +60,16 @@ tasks.register('openhouseCompatibilityTest', Test) { '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' ] - // Test filter - include SQL tests and specific action/source tests + // Test filter - initially exclude everything, then include from list filter { - includeTestsMatching 'org.apache.iceberg.spark.sql.*' - includeTestsMatching 'org.apache.iceberg.spark.actions.TestExpireSnapshotsAction' - includeTestsMatching 'org.apache.iceberg.spark.source.TestSnapshotSelection' + failOnNoMatchingTests = false } - // Load exclusions from the fixtures jar at execution time + // Load inclusions from the fixtures jar at execution time doFirst { configurations.openhouseCompatibilityRuntime.resolve().each { file -> if (file.name.contains('tables-test-fixtures')) { - zipTree(file).matching { include 'openhouse-incompatible-tests.txt' }.each { f -> + zipTree(file).matching { include 'openhouse-iceberg-compatibility-tests.txt' }.each { f -> f.eachLine { line -> line = line.trim() if (line && !line.startsWith('#')) { @@ -82,8 +80,7 @@ tasks.register('openhouseCompatibilityTest', Test) { if (!line.endsWith('*')) { line = line + '*' } - println "DEBUG: Excluding ${line}" - filter.excludeTestsMatching line + filter.includeTestsMatching line } } } diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 95c21b2efd..7913b43d73 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -122,6 +122,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { } project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") { + ext.sparkMajorVersion = sparkMajorVersion + apply from: "$projectDir/../../openhouse.gradle" apply plugin: 'java-library' apply plugin: 'scala' apply plugin: 'com.github.alisiikh.scalastyle' diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index ea1040dcf0..ad824dbcbc 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -67,6 +68,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.TestCatalogProvider; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -104,63 +106,95 @@ public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase + " format = {3}, vectorized = {4}, distributionMode = {5}," + " fanout = {6}, branch = {7}, planningMode = {8}") public static Object[][] parameters() { - return new Object[][] { - { - "testhive", - SparkCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default"), - FileFormat.ORC, - true, - WRITE_DISTRIBUTION_MODE_NONE, - true, - SnapshotRef.MAIN_BRANCH, - LOCAL - }, - { - "testhive", - SparkCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default"), - FileFormat.PARQUET, - true, - WRITE_DISTRIBUTION_MODE_NONE, - false, - "test", - DISTRIBUTED - }, - { - "testhadoop", - SparkCatalog.class.getName(), - ImmutableMap.of("type", "hadoop"), - FileFormat.PARQUET, - RANDOM.nextBoolean(), - WRITE_DISTRIBUTION_MODE_HASH, - true, - null, - LOCAL - }, - { - "spark_catalog", - SparkSessionCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default", - "clients", "1", - "parquet-enabled", "false", - "cache-enabled", + List params = new ArrayList<>(); + + if (!Boolean.getBoolean("iceberg.test.catalog.skip.defaults")) { + params.add( + new Object[] { + "testhive", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hive", "default-namespace", "default"), + FileFormat.ORC, + true, + WRITE_DISTRIBUTION_MODE_NONE, + true, + SnapshotRef.MAIN_BRANCH, + LOCAL + }); + params.add( + new Object[] { + "testhive", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hive", "default-namespace", "default"), + FileFormat.PARQUET, + true, + WRITE_DISTRIBUTION_MODE_NONE, + false, + "test", + DISTRIBUTED + }); + params.add( + new Object[] { + "testhadoop", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hadoop"), + FileFormat.PARQUET, + RANDOM.nextBoolean(), + WRITE_DISTRIBUTION_MODE_HASH, + true, + null, + LOCAL + }); + params.add( + new Object[] { + "spark_catalog", + SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + "default-namespace", + "default", + "clients", + "1", + "parquet-enabled", + "false", + "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync - ), - FileFormat.AVRO, - false, - WRITE_DISTRIBUTION_MODE_RANGE, - false, - "test", - DISTRIBUTED - } - }; + ), + FileFormat.AVRO, + false, + WRITE_DISTRIBUTION_MODE_RANGE, + false, + "test", + DISTRIBUTED + }); + } + + loadExternalCatalogProviders() + .forEach( + provider -> { + Object[][] configs = provider.getCatalogConfigurations(); + for (Object[] config : configs) { + String name = (String) config[0]; + String impl = (String) config[1]; + Map props = (Map) config[2]; + + params.add( + new Object[] { + name, + impl, + props, + FileFormat.PARQUET, + true, + WRITE_DISTRIBUTION_MODE_HASH, + true, + null, + LOCAL + }); + } + }); + + return params.toArray(new Object[0][]); } protected abstract Map extraTableProperties(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 022edecc31..a35e0e6d41 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -29,6 +29,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 7357a4683b..901c0225d2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -149,6 +149,11 @@ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap opti Configuration conf = SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name); Map optionsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); optionsMap.putAll(options.asCaseSensitiveMap()); + + if (optionsMap.containsKey(CatalogProperties.CATALOG_IMPL)) { + optionsMap.remove(CatalogUtil.ICEBERG_CATALOG_TYPE); + } + optionsMap.put(CatalogProperties.APP_ID, SparkSession.active().sparkContext().applicationId()); optionsMap.put(CatalogProperties.USER, SparkSession.active().sparkContext().sparkUser()); return CatalogUtil.buildIcebergCatalog(name, optionsMap, conf); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index 33384e3eff..8f4b169dcf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -308,7 +308,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { && options .get(CatalogUtil.ICEBERG_CATALOG_TYPE) .equalsIgnoreCase(CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE)) { - validateHmsUri(options.get(CatalogProperties.URI)); + // validateHmsUri(options.get(CatalogProperties.URI)); } this.catalogName = name; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index abfd7da0c7..9d129b908e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -59,6 +59,31 @@ public enum SparkCatalogConfig { private final String implementation; private final Map properties; + private static final Object[] DYNAMIC_CONFIG = loadDynamicConfig(); + + private static Object[] loadDynamicConfig() { + String providerClass = System.getProperty("iceberg.test.catalog.provider"); + if (providerClass != null && !providerClass.isEmpty()) { + try { + TestCatalogProvider provider = + (TestCatalogProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + Object[][] configs = provider.getCatalogConfigurations(); + if (configs != null && configs.length > 0) { + for (Object[] config : configs) { + if ("spark_catalog".equals(config[0])) { + return config; + } + } + return configs[0]; + } + } catch (Exception e) { + throw new RuntimeException("Failed to load catalog provider: " + providerClass, e); + } + } + return null; + } + SparkCatalogConfig(String catalogName, String implementation, Map properties) { this.catalogName = catalogName; this.implementation = implementation; @@ -70,10 +95,16 @@ public String catalogName() { } public String implementation() { + if (this == SPARK && DYNAMIC_CONFIG != null) { + return (String) DYNAMIC_CONFIG[1]; + } return implementation; } public Map properties() { + if (this == SPARK && DYNAMIC_CONFIG != null) { + return (Map) DYNAMIC_CONFIG[2]; + } return properties; } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index 71c94f40ca..2a79983111 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -88,7 +88,7 @@ protected static Object[][] defaultCatalogParameters() { }; } - private static List loadExternalCatalogProviders() { + protected static List loadExternalCatalogProviders() { List providers = new ArrayList<>(); // System property takes precedence