From 3c6dde317adb7c5a8cf62ab2629c2bd535d21462 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Fri, 9 Jan 2026 11:45:03 -0500 Subject: [PATCH 1/3] fix rescale configs --- .../connectors/psc/table/PscTableCommonUtils.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index aaaa9bb..ffcdff6 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -20,6 +20,7 @@ import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationUtils; import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.annotation.VisibleForTesting; @@ -184,11 +185,11 @@ private static int getTopicPartitionCount(List topicUris, Properties psc PscMetadataClient metadataClient = null; try { - // Create PSC configuration from properties - PscConfiguration pscConfig = new PscConfiguration(); - for (String key : pscProperties.stringPropertyNames()) { - pscConfig.setProperty(key, pscProperties.getProperty(key)); - } + // Create PSC configuration using the proper utility method. + // This ensures psc.conf is loaded and all default configurations + // (environment provider, SSL certs, etc.) are properly initialized, + // matching the behavior of PscSourceEnumerator at runtime. + PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(pscProperties); metadataClient = new PscMetadataClient(pscConfig); From d571fb2f75c5289f53f95cf8da0717feeed34e94 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Fri, 9 Jan 2026 11:54:01 -0500 Subject: [PATCH 2/3] update to ID --- .../psc/table/PscTableCommonUtils.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index ffcdff6..558d379 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -185,11 +185,23 @@ private static int getTopicPartitionCount(List topicUris, Properties psc PscMetadataClient metadataClient = null; try { - // Create PSC configuration using the proper utility method. - // This ensures psc.conf is loaded and all default configurations - // (environment provider, SSL certs, etc.) are properly initialized, - // matching the behavior of PscSourceEnumerator at runtime. - PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(pscProperties); + // Create a copy of properties to avoid modifying the original + Properties metadataClientProps = new Properties(); + metadataClientProps.putAll(pscProperties); + + // Set the required psc.metadata.client.id if not already present. + // This is required by PscMetadataClient validation (see PscConfigurationInternal.validateMetadataClientConfiguration). + // PscSourceEnumerator also sets this before creating its metadata client. + if (!metadataClientProps.containsKey(PscConfiguration.PSC_METADATA_CLIENT_ID)) { + metadataClientProps.setProperty( + PscConfiguration.PSC_METADATA_CLIENT_ID, + "psc-table-partition-count-query"); + } + + // Convert properties to PSC configuration. + // When passed to PscMetadataClient, it will be wrapped in PscConfigurationInternal + // which loads psc.conf defaults and validates the configuration. + PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(metadataClientProps); metadataClient = new PscMetadataClient(pscConfig); From 055b7664738bffe3995a2c31e4b5396f583f8b89 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Fri, 9 Jan 2026 12:53:57 -0500 Subject: [PATCH 3/3] add test --- .../psc/table/PscTableCommonUtilsTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java index 90c7b62..c709e64 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java @@ -18,6 +18,10 @@ package com.pinterest.flink.streaming.connectors.psc.table; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscConfigurationUtils; +import com.pinterest.psc.exception.startup.ConfigurationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.junit.After; @@ -30,6 +34,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** * Comprehensive test suite for PscTableCommonUtils, focusing on rescale decision logic @@ -355,5 +360,29 @@ public void testProviderResetRestoresDefaultBehavior() { tableOptions, globalConfig, topicUris, pscProperties, null); assertThat(resultAfterReset).isFalse(); // partition count = -1, fail-safe } + + // ============================================ + // Tests validating PscMetadataClient configuration fix + // ============================================ + + @Test + public void testMetadataClientValidationPassesWithClientId() { + // This test proves the FIX works: + // With psc.metadata.client.id set, PscConfigurationInternal validation passes + + // Given: Properties WITH psc.metadata.client.id (matching our fix) + Properties propsWithClientId = new Properties(); + propsWithClientId.setProperty( + PscConfiguration.PSC_METADATA_CLIENT_ID, + "psc-table-partition-count-query" + ); + + PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(propsWithClientId); + + // When/Then: Creating PscConfigurationInternal for metadata client does NOT throw + assertThatCode(() -> + new PscConfigurationInternal(pscConfig, PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA) + ).doesNotThrowAnyException(); + } }