diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4f252572fb22d..4c0ec0f8ecdb0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -137,7 +137,7 @@
+ files="(AbstractHerder|DistributedHerder|KafkaBasedLog|WorkerSourceTask)Test.java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
index ae9fe3b99e0ed..aa56f2d03872c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
@@ -50,7 +50,8 @@ private void validate(String connectorName, VersionRange range) throws ConnectEx
String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
- throw new VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
+ VersionedPluginLoadingException exception = invalidVersions.get(connectorName).get(version);
+ throw new VersionedPluginLoadingException(exception.getMessage(), exception.availableVersions());
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 02caa21812f6b..d1851138ecd78 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -43,6 +43,7 @@
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -405,6 +406,37 @@ public void testConfigValidationMultipleNullConfig() {
verifyValidationIsolation();
}
+ @Test
+ public void testRepeatedConfigValidationOfInvalidVersionSurfacesAvailableVersions() {
+ Class extends Connector> connectorClass = SampleSourceConnector.class;
+ String requestedVersion = "2.0.0";
+ List availableVersions = List.of("1.0.0");
+
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString(), any()))
+ .thenThrow(new VersionedPluginLoadingException("no matching version", availableVersions));
+
+ AbstractHerder herder = testHerder();
+
+ Map config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+ config.put(ConnectorConfig.CONNECTOR_VERSION, requestedVersion);
+
+ ConfigInfos first = herder.validateConnectorConfig(config, s -> null, false);
+ ConfigInfos second = herder.validateConnectorConfig(config, s -> null, false);
+
+ ConfigInfo firstVersionInfo = findInfo(first, ConnectorConfig.CONNECTOR_VERSION);
+ assertNotNull(firstVersionInfo);
+ assertEquals(availableVersions, firstVersionInfo.configValue().recommendedValues());
+
+ ConfigInfo secondVersionInfo = findInfo(second, ConnectorConfig.CONNECTOR_VERSION);
+ assertNotNull(secondVersionInfo);
+ assertEquals(availableVersions, secondVersionInfo.configValue().recommendedValues());
+
+ // The second validation comes from the cache, so the loader is only invoked once.
+ verify(plugins, times(1)).newConnector(eq(connectorClass.getName()), any());
+ }
+
@Test
public void testBuildRestartPlanForConnectorAndTasks() {
RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/CachedConnectorsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/CachedConnectorsTest.java
new file mode 100644
index 0000000000000..903eb69d50c56
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/CachedConnectorsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
+
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class CachedConnectorsTest {
+
+ @Mock
+ private Plugins plugins;
+
+ @Test
+ public void testCachedInvalidVersionFailurePreservesAvailableVersions() throws Exception {
+ String requestedVersion = "2.0.0";
+ String connectorClass = "org.apache.kafka.connect.runtime.SomeConnector";
+ List availableVersions = List.of("1.0.0");
+ VersionedPluginLoadingException loadingException =
+ new VersionedPluginLoadingException("no matching version", availableVersions);
+ when(plugins.newConnector(anyString(), any())).thenThrow(loadingException);
+
+ CachedConnectors cachedConnectors = new CachedConnectors(plugins);
+ VersionRange versionRange = PluginUtils.connectorVersionRequirement(requestedVersion);
+
+ VersionedPluginLoadingException firstException = assertThrows(
+ VersionedPluginLoadingException.class,
+ () -> cachedConnectors.getConnector(connectorClass, versionRange)
+ );
+ VersionedPluginLoadingException cachedException = assertThrows(
+ VersionedPluginLoadingException.class,
+ () -> cachedConnectors.getConnector(connectorClass, versionRange)
+ );
+
+ // A cached version loading failure should preserve its exception details.
+ assertEquals(firstException.getMessage(), cachedException.getMessage());
+ assertEquals(availableVersions, cachedException.availableVersions());
+
+ // The second lookup comes from the cache, so the loader is only invoked once.
+ verify(plugins, times(1)).newConnector(connectorClass, versionRange);
+ }
+}