diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index aaaa9bb..558d379 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -20,6 +20,7 @@ import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationUtils; import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.annotation.VisibleForTesting; @@ -184,12 +185,24 @@ private static int getTopicPartitionCount(List topicUris, Properties psc PscMetadataClient metadataClient = null; try { - // Create PSC configuration from properties - PscConfiguration pscConfig = new PscConfiguration(); - for (String key : pscProperties.stringPropertyNames()) { - pscConfig.setProperty(key, pscProperties.getProperty(key)); + // Create a copy of properties to avoid modifying the original + Properties metadataClientProps = new Properties(); + metadataClientProps.putAll(pscProperties); + + // Set the required psc.metadata.client.id if not already present. + // This is required by PscMetadataClient validation (see PscConfigurationInternal.validateMetadataClientConfiguration). + // PscSourceEnumerator also sets this before creating its metadata client. + if (!metadataClientProps.containsKey(PscConfiguration.PSC_METADATA_CLIENT_ID)) { + metadataClientProps.setProperty( + PscConfiguration.PSC_METADATA_CLIENT_ID, + "psc-table-partition-count-query"); } + // Convert properties to PSC configuration. + // When passed to PscMetadataClient, it will be wrapped in PscConfigurationInternal + // which loads psc.conf defaults and validates the configuration. + PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(metadataClientProps); + metadataClient = new PscMetadataClient(pscConfig); int minPartitionCount = Integer.MAX_VALUE; diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java index 90c7b62..c709e64 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java @@ -18,6 +18,10 @@ package com.pinterest.flink.streaming.connectors.psc.table; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscConfigurationUtils; +import com.pinterest.psc.exception.startup.ConfigurationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.junit.After; @@ -30,6 +34,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** * Comprehensive test suite for PscTableCommonUtils, focusing on rescale decision logic @@ -355,5 +360,29 @@ public void testProviderResetRestoresDefaultBehavior() { tableOptions, globalConfig, topicUris, pscProperties, null); assertThat(resultAfterReset).isFalse(); // partition count = -1, fail-safe } + + // ============================================ + // Tests validating PscMetadataClient configuration fix + // ============================================ + + @Test + public void testMetadataClientValidationPassesWithClientId() { + // This test proves the FIX works: + // With psc.metadata.client.id set, PscConfigurationInternal validation passes + + // Given: Properties WITH psc.metadata.client.id (matching our fix) + Properties propsWithClientId = new Properties(); + propsWithClientId.setProperty( + PscConfiguration.PSC_METADATA_CLIENT_ID, + "psc-table-partition-count-query" + ); + + PscConfiguration pscConfig = PscConfigurationUtils.propertiesToPscConfiguration(propsWithClientId); + + // When/Then: Creating PscConfigurationInternal for metadata client does NOT throw + assertThatCode(() -> + new PscConfigurationInternal(pscConfig, PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA) + ).doesNotThrowAnyException(); + } }