From c65c991ed659311c526126ebbbe40196653e00f6 Mon Sep 17 00:00:00 2001 From: Alice Date: Fri, 26 Jun 2026 11:49:40 -0400 Subject: [PATCH] Added support for dynamic SSL certificate reload for controllers --- .../kafka/clients/admin/ConfigEntry.java | 20 +- .../kafka/common/config/ConfigResource.java | 1 + .../requests/DescribeConfigsResponse.java | 4 +- .../scala/kafka/network/SocketServer.scala | 32 +-- .../scala/kafka/raft/KafkaRaftManager.scala | 6 +- .../kafka/server/ConfigAdminManager.scala | 6 +- .../scala/kafka/server/ConfigHelper.scala | 27 ++- .../scala/kafka/server/ControllerApis.scala | 2 +- .../ControllerConfigurationValidator.scala | 6 +- .../scala/kafka/server/ControllerServer.scala | 41 ++++ .../controller/DynamicControllerConfig.scala | 76 +++++++ .../DynamicControllerSslListener.scala | 95 +++++++++ .../ControllerDynamicSslReloadTest.scala | 185 ++++++++++++++++++ .../DynamicControllerConfigTest.scala | 91 +++++++++ .../ConfigurationControlManager.java | 56 ++++++ .../ControllerListenerReconfigurable.java | 48 +++++ .../ControllerDynamicConfigPublisher.java | 84 ++++++++ .../ConfigurationControlManagerTest.java | 51 +++++ .../kafka/server/config/ConfigType.java | 1 + .../config/DefaultSupportedConfigChecker.java | 3 + .../org/apache/kafka/tools/ConfigCommand.java | 38 ++-- 21 files changed, 831 insertions(+), 42 deletions(-) create mode 100644 core/src/main/scala/kafka/server/controller/DynamicControllerConfig.scala create mode 100644 core/src/main/scala/kafka/server/controller/DynamicControllerSslListener.scala create mode 100644 core/src/test/scala/integration/kafka/server/controller/ControllerDynamicSslReloadTest.scala create mode 100644 core/src/test/scala/unit/kafka/server/controller/DynamicControllerConfigTest.scala create mode 100644 metadata/src/main/java/org/apache/kafka/controller/util/ControllerListenerReconfigurable.java create mode 100644 metadata/src/main/java/org/apache/kafka/image/publisher/ControllerDynamicConfigPublisher.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java index cde99ffb85229..0337bdd548466 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java @@ -213,15 +213,17 @@ public enum ConfigType { * Source of configuration entries. */ public enum ConfigSource { - DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic - DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker - DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker - DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster - DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients - DYNAMIC_GROUP_CONFIG, // dynamic group config that is configured for a specific group - STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file) - DEFAULT_CONFIG, // built-in default configuration for configs that have a default value - UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set + DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic + DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker + DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker + DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster + DYNAMIC_CONTROLLER_CONFIG, // dynamic controller config that is configured for a specific controller + DYNAMIC_DEFAULT_CONTROLLER_CONFIG, // dynamic controller config that is configured as default for all controllers in the cluster + DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients + DYNAMIC_GROUP_CONFIG, // dynamic group config that is configured for a specific group + STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file) + DEFAULT_CONFIG, // built-in default configuration for configs that have a default value + UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set } /** diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java index 4c082aff6704d..05ae33821816c 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java @@ -33,6 +33,7 @@ public final class ConfigResource { * Type of resource. */ public enum Type { + CONTROLLER((byte) 64), GROUP((byte) 32), CLIENT_METRICS((byte) 16), BROKER_LOGGER((byte) 8), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index f291af273b84a..bd428d0f61c6d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -120,7 +120,9 @@ public enum ConfigSource { DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG), DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG), CLIENT_METRICS_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG), - GROUP_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG); + GROUP_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + DYNAMIC_CONTROLLER_CONFIG((byte) 9, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CONTROLLER_CONFIG), + DYNAMIC_DEFAULT_CONTROLLER_CONFIG((byte) 10, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CONTROLLER_CONFIG); final byte id; private final org.apache.kafka.clients.admin.ConfigEntry.ConfigSource source; diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 50fe55427bf55..954cf1afba603 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -428,6 +428,12 @@ class DataPlaneAcceptor(socketServer: SocketServer, } } + /** + * Returns the processors for this acceptor. + * Used by controller dynamic config to access channel builders. + */ + def getProcessors: Seq[Processor] = processors.toSeq + /** * Reconfigures this instance with the given key-value pairs. The provided * map contains all configs including any reconfigurable configs that @@ -806,7 +812,7 @@ private[kafka] class Processor( connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, failedAuthenticationDelayMs: Int, - listenerName: ListenerName, + val listenerName: ListenerName, securityProtocol: SecurityProtocol, config: KafkaConfig, metrics: Metrics, @@ -850,20 +856,20 @@ private[kafka] class Processor( private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", JSocketServer.METRICS_GROUP, metricTags) metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount) - private[network] val selector = createSelector( - ChannelBuilders.serverChannelBuilder( - listenerName, - listenerName == config.interBrokerListenerName, - securityProtocol, - config, - credentialProvider.credentialCache, - credentialProvider.tokenCache, - time, - logContext, - version => apiVersionManager.apiVersionResponse(0, version < 4) - ) + private[kafka] val channelBuilder: ChannelBuilder = ChannelBuilders.serverChannelBuilder( + listenerName, + listenerName == config.interBrokerListenerName, + securityProtocol, + config, + credentialProvider.credentialCache, + credentialProvider.tokenCache, + time, + logContext, + version => apiVersionManager.apiVersionResponse(0, version < 4) ) + private[network] val selector = createSelector(channelBuilder) + // Visible to override for testing protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = { channelBuilder match { diff --git a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala index 0f35210090afa..4619e83daba74 100644 --- a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala +++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.ApiMessage import org.apache.kafka.common.requests.RequestContext import org.apache.kafka.common.requests.RequestHeader @@ -126,6 +126,7 @@ class KafkaRaftManager[T]( } override val raftLog: RaftLog = buildMetadataLog() + private var raftClientChannelBuilder: ChannelBuilder = _ private val netChannel = buildNetworkChannel() private val expirationTimer = new SystemTimer("raft-expiration-executor") private val expirationService = new TimingWheelExpirationService(expirationTimer) @@ -162,6 +163,8 @@ class KafkaRaftManager[T]( clientDriver.handleRequest(context, header, request, createdTimeMs) } + def clientChannelBuilder: ChannelBuilder = raftClientChannelBuilder + private def buildRaftClient(): KafkaRaftClient[T] = { new KafkaRaftClient( OptionalInt.of(config.nodeId), @@ -217,6 +220,7 @@ class KafkaRaftManager[T]( time, logContext ) + raftClientChannelBuilder = channelBuilder val metricGroupPrefix = "raft-channel" val collectPerConnectionMetrics = false diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index 1b0215316019e..b327e3ba0bb50 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -24,7 +24,7 @@ import kafka.utils._ import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.config.ConfigDef.ConfigKey -import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, GROUP, TOPIC} +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, CONTROLLER, GROUP, TOPIC} import org.apache.kafka.common.config.{ConfigDef, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData} @@ -144,7 +144,7 @@ class ConfigAdminManager(nodeId: Int, validateResourceNameIsCurrentNodeId(resource.resourceName()) } validateBrokerConfigChange(resource, configResource) - case TOPIC | CLIENT_METRICS | GROUP => + case TOPIC | CLIENT_METRICS | GROUP | CONTROLLER => // Nothing to do. case _ => throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}") @@ -243,7 +243,7 @@ class ConfigAdminManager(nodeId: Int, validateResourceNameIsCurrentNodeId(resource.resourceName()) } validateBrokerConfigChange(resource, configResource) - case TOPIC | CLIENT_METRICS | GROUP => + case TOPIC | CLIENT_METRICS | GROUP | CONTROLLER => // Nothing to do. case _ => // Since legacy AlterConfigs does not support BROKER_LOGGER, any attempt to use it diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 304d8c155411a..e16301dd0b037 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -52,7 +52,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo val describeConfigsRequest = request.body(classOf[DescribeConfigsRequest]) val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource => ConfigResource.Type.forId(resource.resourceType) match { - case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => + case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER => authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName) @@ -64,7 +64,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo val authorizedConfigs = describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation) val unauthorizedConfigs = unauthorizedResources.map { resource => val error = ConfigResource.Type.forId(resource.resourceType) match { - case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => Errors.CLUSTER_AUTHORIZATION_FAILED + case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER => Errors.CLUSTER_AUTHORIZATION_FAILED case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}") @@ -136,6 +136,29 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _)) } + case ConfigResource.Type.CONTROLLER => + val controllerProps = configRepository.config( + new ConfigResource(ConfigResource.Type.CONTROLLER, resource.resourceName)) + val isDefault = resource.resourceName == null || resource.resourceName.isEmpty + val sourceId = + if (isDefault) ConfigSource.DYNAMIC_DEFAULT_CONTROLLER_CONFIG.id + else ConfigSource.DYNAMIC_CONTROLLER_CONFIG.id + val entries = controllerProps.entrySet.asScala.map { e => + val name = e.getKey.toString + val value = e.getValue.toString + val isSensitive = name.toLowerCase.contains("password") + new DescribeConfigsResponseData.DescribeConfigsResourceResult() + .setName(name) + .setValue(if (isSensitive) null else value) + .setConfigSource(sourceId) + .setIsSensitive(isSensitive) + .setReadOnly(false) + .setSynonyms(Collections.emptyList()) + }.toList.asJava + new DescribeConfigsResponseData.DescribeConfigsResult() + .setErrorCode(Errors.NONE.code) + .setConfigs(entries) + case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") } configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 77ca25598662b..37a94a24b2461 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -472,7 +472,7 @@ class ControllerApis( private def authorizeAlterResource(requestContext: RequestContext, resource: ConfigResource): ApiError = { resource.`type` match { - case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS => + case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER => if (authHelper.authorize(requestContext, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) { new ApiError(NONE) } else { diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index 7fdf7b00a98b0..2714d0c76ddf1 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -19,7 +19,7 @@ package kafka.server import java.util import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, CONTROLLER, GROUP, TOPIC} import org.apache.kafka.controller.ConfigurationValidator import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic @@ -113,6 +113,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu resource.`type`() match { case TOPIC => validateTopicName(resource.name()) case BROKER => validateBrokerName(resource.name()) + case CONTROLLER => validateBrokerName(resource.name()) case _ => throwExceptionForUnknownResourceType(resource) } } @@ -129,6 +130,9 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu LogConfig.validate(oldConfigs, filteredConfigs, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) case BROKER => validateBrokerName(resource.name()) + case CONTROLLER => + validateBrokerName(resource.name()) + filterAndValidateNullConfigs(newConfigs, "controller") case CLIENT_METRICS => val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "client metrics") ClientMetricsConfigs.validate(resource.name(), filteredConfigs) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index e0e991cfbb58f..cb0e8235123bf 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -338,6 +338,47 @@ class ControllerServer( ), "controller")) + // Set up the controller dynamic config manager for SSL reconfiguration. + val dynamicControllerConfig = new kafka.server.controller.DynamicControllerConfig(config) + + import scala.jdk.CollectionConverters._ + config.controllerListenerNames.asScala.foreach { listenerName => + socketServer.dataPlaneAcceptor(listenerName).foreach { acceptor => + acceptor.getProcessors.foreach { processor => + processor.channelBuilder match { + case sslBuilder: org.apache.kafka.common.network.SslChannelBuilder => + val wrapper = new kafka.server.controller.DynamicControllerSslListener( + sslBuilder, + processor.listenerName + ) + dynamicControllerConfig.addReconfigurable(wrapper) + info(s"Registered SSL reconfigurable for controller inbound listener ${processor.listenerName}") + case _ => + } + } + } + } + + // Register the outbound raft client's SSL channel builder for SSL hot-reload. + Option(sharedServer.raftManager).foreach { raftManager => + raftManager.clientChannelBuilder match { + case sslBuilder: org.apache.kafka.common.network.SslChannelBuilder => + val controllerListenerName = new ListenerName(config.controllerListenerNames.get(0)) + val wrapper = new kafka.server.controller.DynamicControllerSslListener( + sslBuilder, + controllerListenerName + ) + dynamicControllerConfig.addReconfigurable(wrapper) + info(s"Registered SSL reconfigurable for raft outbound client (listener $controllerListenerName)") + case _ => + } + } + + metadataPublishers.add(new org.apache.kafka.image.publisher.ControllerDynamicConfigPublisher( + config.nodeId, + (configs: java.util.Map[String, String]) => dynamicControllerConfig.apply(configs) + )) + // Register this instance for dynamic config changes to the KafkaConfig. This must be called // after the authorizer and quotaManagers are initialized, since it references those objects. // It must be called before DynamicClientQuotaPublisher is installed, since otherwise we may diff --git a/core/src/main/scala/kafka/server/controller/DynamicControllerConfig.scala b/core/src/main/scala/kafka/server/controller/DynamicControllerConfig.scala new file mode 100644 index 0000000000000..6e84f18985b4e --- /dev/null +++ b/core/src/main/scala/kafka/server/controller/DynamicControllerConfig.scala @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.controller + +import kafka.server.KafkaConfig +import kafka.utils.Logging +import org.apache.kafka.controller.util.ControllerListenerReconfigurable + +import java.util.concurrent.CopyOnWriteArrayList +import scala.jdk.CollectionConverters._ + +/** + * Controller-side dynamic config manager. + * Owns the current effective config map, the registry of reconfigurables, + * and dispatches config updates on metadata replay. + */ +class DynamicControllerConfig( + config: KafkaConfig +) extends Logging { + + private val reconfigurables = new CopyOnWriteArrayList[ControllerListenerReconfigurable]() + @volatile private var currentConfig: Map[String, String] = Map.empty + + def addReconfigurable(reconfigurable: ControllerListenerReconfigurable): Unit = { + reconfigurables.add(reconfigurable) + } + + def removeReconfigurable(reconfigurable: ControllerListenerReconfigurable): Unit = { + reconfigurables.remove(reconfigurable) + } + + /** + * Apply a new effective config map from the metadata log. + * This is called by ControllerDynamicConfigPublisher on each metadata update. + */ + def apply(newConfig: java.util.Map[String, String]): Unit = { + val newConfigScala = newConfig.asScala.toMap + + if (newConfigScala != currentConfig) { + info(s"Applying controller config update: ${newConfigScala.size} configs") + + val iterator = reconfigurables.iterator() + while (iterator.hasNext) { + val reconfigurable = iterator.next() + try { + reconfigurable.validateReconfiguration(newConfig) + reconfigurable.reconfigure(newConfig) + } catch { + case e: Exception => + error(s"Controller dynamic config update rejected for one reconfigurable; " + + s"previous configuration retained. Fix and re-alter.", e) + } + } + currentConfig = newConfigScala + } + } + + def currentValue(key: String): Option[String] = { + currentConfig.get(key) + } +} diff --git a/core/src/main/scala/kafka/server/controller/DynamicControllerSslListener.scala b/core/src/main/scala/kafka/server/controller/DynamicControllerSslListener.scala new file mode 100644 index 0000000000000..fb8e3fabccffd --- /dev/null +++ b/core/src/main/scala/kafka/server/controller/DynamicControllerSslListener.scala @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.controller + +import kafka.utils.Logging +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.network.{ChannelBuilder, ListenerName, SslChannelBuilder} +import org.apache.kafka.controller.util.ControllerListenerReconfigurable + +import java.util +import scala.jdk.CollectionConverters._ + +/** + * Wraps a SslChannelBuilder and implements ControllerListenerReconfigurable + * by delegating to the existing SslChannelBuilder.reconfigure() method. + */ +class DynamicControllerSslListener( + channelBuilder: ChannelBuilder, + listenerName: ListenerName +) extends ControllerListenerReconfigurable with Logging { + + private val sslChannelBuilder: Option[SslChannelBuilder] = channelBuilder match { + case ssl: SslChannelBuilder => Some(ssl) + case _ => None + } + + if (sslChannelBuilder.isEmpty) { + throw new IllegalArgumentException(s"ChannelBuilder for listener $listenerName is not an SslChannelBuilder") + } + + override def reconfigurableConfigs(): util.Set[String] = { + sslChannelBuilder.get.reconfigurableConfigs() + } + + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { + val listenerConfigs = filterConfigsForListener(configs) + if (!listenerConfigs.isEmpty) { + sslChannelBuilder.get.validateReconfiguration(listenerConfigs) + } + } + + override def reconfigure(configs: util.Map[String, _]): Unit = { + val listenerConfigs = filterConfigsForListener(configs) + if (!listenerConfigs.isEmpty) { + info(s"Reconfiguring SSL for listener $listenerName with ${listenerConfigs.size()} configs") + sslChannelBuilder.get.reconfigure(listenerConfigs) + } + } + + /** + * Filter configs to only those relevant for this listener. + * Accepts bare ssl.* keys and listener.name..ssl.* keys. + */ + private def filterConfigsForListener(configs: util.Map[String, _]): util.Map[String, AnyRef] = { + val result = new util.HashMap[String, AnyRef]() + val listenerPrefix = s"listener.name.${listenerName.value().toLowerCase}." + + configs.asScala.foreach { case (key, value) => + val lowerKey = key.toLowerCase + if (lowerKey.startsWith("ssl.")) { + result.put(key, wrapIfPassword(key, value)) + } else if (lowerKey.startsWith(listenerPrefix)) { + val configKey = key.substring(listenerPrefix.length) + result.put(configKey, wrapIfPassword(configKey, value)) + } + } + + result + } + + /** + * SslFactory's reconfigurable validator/applier casts password values to + * `org.apache.kafka.common.config.types.Password`. + */ + private def wrapIfPassword(key: String, value: Any): AnyRef = value match { + case p: Password => p + case s: String if key.toLowerCase.contains("password") => new Password(s) + case other => other.asInstanceOf[AnyRef] + } +} diff --git a/core/src/test/scala/integration/kafka/server/controller/ControllerDynamicSslReloadTest.scala b/core/src/test/scala/integration/kafka/server/controller/ControllerDynamicSslReloadTest.scala new file mode 100644 index 0000000000000..76e325ca56541 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/controller/ControllerDynamicSslReloadTest.scala @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.controller + +import kafka.server.QuorumTestHarness +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.config.{ConfigResource, SslConfigs} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.test.TestSslUtils +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} + +import java.io.File +import java.security.cert.X509Certificate +import java.util.{Properties, Collections => JCollections} +import javax.net.ssl.{SSLContext, SSLSocket, TrustManager, X509TrustManager} +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ + +/** + * Integration test for dynamic SSL reload on the CONTROLLER listener. + * + * Boots an isolated controller with SSL on the CONTROLLER listener using keystore A. + * Issues an `IncrementalAlterConfigs` against `ConfigResource.Type.CONTROLLER` pointing + * the listener at keystore B. Opens a fresh TLS connection and asserts that the certificate + * presented on the wire is now from keystore B. + */ +@Timeout(120) +class ControllerDynamicSslReloadTest extends QuorumTestHarness { + + private var keystoreA: File = _ + private var keystoreB: File = _ + private var truststore: File = _ + private var serialA: java.math.BigInteger = _ + private var serialB: java.math.BigInteger = _ + + private val KeystorePassword = "controller-test-password" + private val KeyPassword = "controller-test-password" + private val TruststorePassword = "controller-test-password" + + override protected val controllerListenerSecurityProtocol: SecurityProtocol = SecurityProtocol.SSL + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + + keystoreA = newKeystore("keyA") + keystoreB = newKeystore("keyB") + truststore = newTruststore("server-trust", List(("a", keystoreA), ("b", keystoreB))) + serialA = readSerial(keystoreA, "server") + serialB = readSerial(keystoreB, "server") + super.setUp(testInfo) + } + + @AfterEach + override def tearDown(): Unit = { + try super.tearDown() + finally Seq(keystoreA, keystoreB, truststore).filter(_ != null).foreach(f => Utils.delete(f)) + } + + override protected def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = { + val props = new Properties() + val listenerPrefix = "listener.name.controller." + props.setProperty(listenerPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreA.getAbsolutePath) + props.setProperty(listenerPrefix + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, KeystorePassword) + props.setProperty(listenerPrefix + SslConfigs.SSL_KEY_PASSWORD_CONFIG, KeyPassword) + props.setProperty(listenerPrefix + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore.getAbsolutePath) + props.setProperty(listenerPrefix + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, TruststorePassword) + props.setProperty(listenerPrefix + "ssl.client.auth", "none") + Seq(props) + } + + @Test + def testControllerListenerSslHotReload(): Unit = { + val controllerListener = new ListenerName("CONTROLLER") + val controllerPort = controllerServer.socketServer.boundPort(controllerListener) + val nodeId = controllerServer.config.nodeId + val baselineSerial = peerCertSerial("localhost", controllerPort) + assertEquals(serialA, baselineSerial, + s"Baseline cert on CONTROLLER listener should be keystore A's serial (expected=$serialA, got=$baselineSerial)") + + val admin = adminClient(controllerPort) + try { + val resource = new ConfigResource(ConfigResource.Type.CONTROLLER, nodeId.toString) + val ops = List( + new AlterConfigOp(new ConfigEntry("listener.name.controller." + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreB.getAbsolutePath), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("listener.name.controller." + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, KeystorePassword), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("listener.name.controller." + SslConfigs.SSL_KEY_PASSWORD_CONFIG, KeyPassword), AlterConfigOp.OpType.SET) + ).asJava + admin.incrementalAlterConfigs(JCollections.singletonMap(resource, ops)).all().get(30, java.util.concurrent.TimeUnit.SECONDS) + } finally { + admin.close() + } + + val deadline = System.currentTimeMillis() + 15.seconds.toMillis + var observed: java.math.BigInteger = null + while (System.currentTimeMillis() < deadline && observed != serialB) { + observed = peerCertSerial("localhost", controllerPort) + if (observed != serialB) Thread.sleep(200) + } + assertEquals(serialB, observed, + s"After --alter --entity-type controllers, CONTROLLER listener should serve keystore B's cert " + + s"(expected=$serialB, observed=$observed). The reload did not propagate to the SslChannelBuilder.") + } + + private def adminClient(controllerPort: Int): Admin = { + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, s"localhost:$controllerPort") + props.setProperty("security.protocol", SecurityProtocol.SSL.name) + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore.getAbsolutePath) + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, TruststorePassword) + props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "") + Admin.create(props) + } + + private def peerCertSerial(host: String, port: Int): java.math.BigInteger = { + val ctx = SSLContext.getInstance("TLS") + val trustAll = Array[TrustManager](new X509TrustManager { + override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = () + override def getAcceptedIssuers: Array[X509Certificate] = Array.empty + }) + ctx.init(null, trustAll, new java.security.SecureRandom()) + val socket = ctx.getSocketFactory.createSocket(host, port).asInstanceOf[SSLSocket] + try { + socket.startHandshake() + val certs = socket.getSession.getPeerCertificates + assertTrue(certs != null && certs.nonEmpty, "No peer cert presented") + certs(0).asInstanceOf[X509Certificate].getSerialNumber + } finally { + socket.close() + } + } + + private def newKeystore(alias: String): File = { + val file = java.io.File.createTempFile(s"ctrl-test-$alias-", ".jks") + file.deleteOnExit() + val keyPair = TestSslUtils.generateKeyPair("RSA") + val cert = new TestSslUtils.CertificateBuilder() + .sanDnsNames("localhost") + .generate("CN=localhost", keyPair) + TestSslUtils.createKeyStore(file.getAbsolutePath, new org.apache.kafka.common.config.types.Password(KeystorePassword), + new org.apache.kafka.common.config.types.Password(KeyPassword), "server", keyPair.getPrivate, cert) + file + } + + private def newTruststore(name: String, sources: List[(String, File)]): File = { + val file = java.io.File.createTempFile(s"ctrl-test-$name-", ".jks") + file.deleteOnExit() + val ts = java.security.KeyStore.getInstance("JKS") + ts.load(null, null) + sources.foreach { case (alias, ks) => + val src = java.security.KeyStore.getInstance("JKS") + val in = new java.io.FileInputStream(ks) + try src.load(in, KeystorePassword.toCharArray) finally in.close() + ts.setCertificateEntry(alias, src.getCertificate("server")) + } + val out = new java.io.FileOutputStream(file) + try ts.store(out, TruststorePassword.toCharArray) finally out.close() + file + } + + private def readSerial(keystore: File, alias: String): java.math.BigInteger = { + val ks = java.security.KeyStore.getInstance("JKS") + val in = new java.io.FileInputStream(keystore) + try ks.load(in, KeystorePassword.toCharArray) finally in.close() + ks.getCertificate(alias).asInstanceOf[X509Certificate].getSerialNumber + } +} diff --git a/core/src/test/scala/unit/kafka/server/controller/DynamicControllerConfigTest.scala b/core/src/test/scala/unit/kafka/server/controller/DynamicControllerConfigTest.scala new file mode 100644 index 0000000000000..2d4754bcf2a47 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/controller/DynamicControllerConfigTest.scala @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.controller + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.controller.util.ControllerListenerReconfigurable +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util +import scala.collection.mutable + +class DynamicControllerConfigTest { + + private class StubReconfigurable extends ControllerListenerReconfigurable { + val validatedConfigs: mutable.ArrayBuffer[util.Map[String, _]] = mutable.ArrayBuffer.empty + val appliedConfigs: mutable.ArrayBuffer[util.Map[String, _]] = mutable.ArrayBuffer.empty + + override def reconfigurableConfigs(): util.Set[String] = util.Set.of("ssl.keystore.location") + override def validateReconfiguration(configs: util.Map[String, _]): Unit = validatedConfigs += configs + override def reconfigure(configs: util.Map[String, _]): Unit = appliedConfigs += configs + } + + private def newConfig(): KafkaConfig = { + val props = TestUtils.createBrokerConfig(0) + new KafkaConfig(props) + } + + @Test + def testApplyDispatchesToRegisteredReconfigurables(): Unit = { + val dynamicConfig = new DynamicControllerConfig(newConfig()) + val r1 = new StubReconfigurable + val r2 = new StubReconfigurable + dynamicConfig.addReconfigurable(r1) + dynamicConfig.addReconfigurable(r2) + + val configs = new util.HashMap[String, String]() + configs.put("ssl.keystore.location", "/tmp/keystore.jks") + dynamicConfig.apply(configs) + + assertEquals(1, r1.validatedConfigs.size) + assertEquals(1, r1.appliedConfigs.size) + assertEquals(1, r2.validatedConfigs.size) + assertEquals(1, r2.appliedConfigs.size) + assertEquals("/tmp/keystore.jks", dynamicConfig.currentValue("ssl.keystore.location").get) + } + + @Test + def testApplySkipsWhenConfigUnchanged(): Unit = { + val dynamicConfig = new DynamicControllerConfig(newConfig()) + val r = new StubReconfigurable + dynamicConfig.addReconfigurable(r) + + val configs = new util.HashMap[String, String]() + configs.put("ssl.keystore.location", "/tmp/keystore.jks") + dynamicConfig.apply(configs) + dynamicConfig.apply(configs) + + assertEquals(1, r.appliedConfigs.size, "Should not re-dispatch on identical config") + } + + @Test + def testRemoveReconfigurableStopsDispatch(): Unit = { + val dynamicConfig = new DynamicControllerConfig(newConfig()) + val r = new StubReconfigurable + dynamicConfig.addReconfigurable(r) + dynamicConfig.removeReconfigurable(r) + + val configs = new util.HashMap[String, String]() + configs.put("ssl.keystore.location", "/tmp/keystore.jks") + dynamicConfig.apply(configs) + + assertEquals(0, r.appliedConfigs.size) + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 3ff214dfc70e2..00b8862e5ff73 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -279,6 +279,13 @@ private ApiError incrementalAlterConfigResource( List outputRecords, boolean forwarded ) { + if (configResource.type() == Type.CONTROLLER) { + ApiError controllerValidationError = validateControllerConfigs(configResource, keysToOps); + if (controllerValidationError.isFailure()) { + return controllerValidationError; + } + } + List newRecords = new ArrayList<>(); for (Entry> keysToOpsEntry : keysToOps.entrySet()) { String key = keysToOpsEntry.getKey(); @@ -484,6 +491,55 @@ boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) { return false; } + /** + * Validate CONTROLLER resource configurations. + * Only SSL reconfigurable configs with proper listener prefix are allowed. + * Resource name must be either empty (cluster-default) or a stringified node ID. + */ + private ApiError validateControllerConfigs( + ConfigResource configResource, + Map> keysToOps + ) { + java.util.Set sslReconfigurableConfigs = org.apache.kafka.common.config.SslConfigs.RECONFIGURABLE_CONFIGS; + + String resourceName = configResource.name(); + if (!resourceName.isEmpty()) { + try { + Integer.parseInt(resourceName); + } catch (NumberFormatException e) { + return new ApiError(INVALID_CONFIG, + "CONTROLLER resource name must be either empty (cluster-default) or a valid node ID, got: " + resourceName); + } + } + + // Validate config keys + for (String key : keysToOps.keySet()) { + boolean isValid = false; + + if (sslReconfigurableConfigs.contains(key)) { + isValid = true; + } else { + if (key.startsWith("listener.name.")) { + int thirdDot = key.indexOf('.', "listener.name.".length()); + if (thirdDot > 0 && thirdDot < key.length() - 1) { + String configWithoutPrefix = key.substring(thirdDot + 1); + if (sslReconfigurableConfigs.contains(configWithoutPrefix)) { + isValid = true; + } + } + } + } + + if (!isValid) { + return new ApiError(INVALID_CONFIG, + "Invalid config key for CONTROLLER resource: " + key + + ". Only SSL reconfigurable configs are allowed (with optional listener.name.. prefix)."); + } + } + + return ApiError.NONE; + } + /** * Determine the result of applying a batch of legacy configuration changes. Note * that this method does not change the contents of memory. It just generates a diff --git a/metadata/src/main/java/org/apache/kafka/controller/util/ControllerListenerReconfigurable.java b/metadata/src/main/java/org/apache/kafka/controller/util/ControllerListenerReconfigurable.java new file mode 100644 index 0000000000000..6e6406c6c63d4 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/util/ControllerListenerReconfigurable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.util; + +import java.util.Map; +import java.util.Set; + +/** + * Interface for controller components that support dynamic reconfiguration. + * This is controller-specific and distinct from the broker's BrokerReconfigurable. + */ +public interface ControllerListenerReconfigurable { + /** + * Returns the names of configurations that may be reconfigured. + */ + Set reconfigurableConfigs(); + + /** + * Validates that the provided configuration can be applied. + * Throws ConfigException if the new configuration is invalid. + * + * @param configs The new configuration to validate + */ + void validateReconfiguration(Map configs); + + /** + * Reconfigures this instance with the given key-value pairs. + * This method is called after validation has passed. + * + * @param configs The new configuration to apply + */ + void reconfigure(Map configs); +} diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/ControllerDynamicConfigPublisher.java b/metadata/src/main/java/org/apache/kafka/image/publisher/ControllerDynamicConfigPublisher.java new file mode 100644 index 0000000000000..0dbf6c7befcaf --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/ControllerDynamicConfigPublisher.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Publishes CONTROLLER resource configuration changes from the metadata log + * to registered consumers (e.g., DynamicControllerConfig). + */ +public class ControllerDynamicConfigPublisher implements MetadataPublisher { + private static final Logger log = LoggerFactory.getLogger(ControllerDynamicConfigPublisher.class); + + private final int nodeId; + private final Consumer> configConsumer; + + public ControllerDynamicConfigPublisher( + int nodeId, + Consumer> configConsumer + ) { + this.nodeId = nodeId; + this.configConsumer = configConsumer; + } + + @Override + public String name() { + return "ControllerDynamicConfigPublisher"; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + if (delta.configsDelta() == null) { + return; + } + + Map effectiveConfig = new HashMap<>(); + ConfigResource clusterDefault = new ConfigResource(ConfigResource.Type.CONTROLLER, ""); + Map clusterDefaultConfigs = newImage.configs().configMapForResource(clusterDefault); + if (clusterDefaultConfigs != null) { + effectiveConfig.putAll(clusterDefaultConfigs); + } + + ConfigResource nodeResource = new ConfigResource(ConfigResource.Type.CONTROLLER, String.valueOf(nodeId)); + Map nodeConfigs = newImage.configs().configMapForResource(nodeResource); + if (nodeConfigs != null) { + effectiveConfig.putAll(nodeConfigs); + } + + if (!effectiveConfig.isEmpty() || delta.configsDelta().changes().containsKey(clusterDefault) || + delta.configsDelta().changes().containsKey(nodeResource)) { + log.info("Publishing controller config update for node {}: {} configs", nodeId, effectiveConfig.size()); + configConsumer.accept(effectiveConfig); + } + } + +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index b1e5f4c8872ba..ca26e7b60671d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -112,6 +112,7 @@ public class ConfigurationControlManagerTest { static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0"); static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic"); + static final ConfigResource CONTROLLER1 = new ConfigResource(ConfigResource.Type.CONTROLLER, "1"); static class TestExistenceChecker implements Consumer { static final TestExistenceChecker INSTANCE = new TestExistenceChecker(); @@ -730,4 +731,54 @@ public void testIsCordonedLogDirsInvalid() { assertTrue(manager.isCordonedLogDirsInvalid(cr, null, false)); assertFalse(manager.isCordonedLogDirsInvalid(cr, null, true)); } + + @Test + public void testIncrementalAlterControllerConfigs() { + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + build(); + + ControllerResult> result = manager. + incrementalAlterConfigs(toMap(entry(CONTROLLER1, toMap( + entry("listener.name.CONTROLLER.ssl.keystore.location", entry(SET, "/tmp/keystore.jks")), + entry("listener.name.CONTROLLER.ssl.keystore.password", entry(SET, "secret"))))), + true, + false); + + ApiError error = result.response().get(CONTROLLER1); + if (error.isFailure()) { + System.out.println("Error: " + error.message()); + } + assertEquals(2, result.records().size()); + assertEquals(ApiError.NONE, error); + + ConfigResource controllerDefault = new ConfigResource(ConfigResource.Type.CONTROLLER, ""); + result = manager.incrementalAlterConfigs(toMap(entry(controllerDefault, toMap( + entry("ssl.truststore.location", entry(SET, "/tmp/truststore.jks"))))), + true, + false); + + assertEquals(1, result.records().size()); + assertEquals(ApiError.NONE, result.response().get(controllerDefault)); + + result = manager.incrementalAlterConfigs(toMap(entry(CONTROLLER1, toMap( + entry("some.invalid.config", entry(SET, "value"))))), + true, + false); + + assertEquals(0, result.records().size()); + assertEquals(Errors.INVALID_CONFIG, result.response().get(CONTROLLER1).error()); + assertTrue(result.response().get(CONTROLLER1).message().contains("Invalid config key")); + + ConfigResource invalidController = new ConfigResource(ConfigResource.Type.CONTROLLER, "not-a-number"); + result = manager.incrementalAlterConfigs(toMap(entry(invalidController, toMap( + entry("ssl.keystore.location", entry(SET, "/tmp/keystore.jks"))))), + true, + false); + + assertEquals(0, result.records().size()); + assertEquals(Errors.INVALID_CONFIG, result.response().get(invalidController).error()); + assertTrue(result.response().get(invalidController).message().contains("valid node ID")); + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ConfigType.java b/server-common/src/main/java/org/apache/kafka/server/config/ConfigType.java index f2a57c0ed5a4c..a42d41564c168 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ConfigType.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ConfigType.java @@ -24,6 +24,7 @@ public enum ConfigType { CLIENT("clients"), USER("users"), BROKER("brokers"), + CONTROLLER("controllers"), IP("ips"), CLIENT_METRICS("client-metrics"), GROUP("groups"); diff --git a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java index 00cdeaa13e401..d05a15a50d413 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java +++ b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java @@ -38,6 +38,8 @@ * are user-defined at runtime and cannot be pre-enumerated. They also include * plugin-defined configs (e.g., custom authorizer or quota callback configs) with * arbitrary names. For these reasons BROKER configs are not filtered by name. + * - CONTROLLER: All config names are accepted. the SSL keys used for the CONTROLLER listener + * are prefixed with the listener name (e.g., listener.name.controller.ssl.keystore.location). * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs * - GROUP: Configurations defined in GroupConfig * @@ -63,6 +65,7 @@ public DefaultSupportedConfigChecker() { this.validConfigsByType = Map.of( ConfigResource.Type.TOPIC, new SetContainsPredicate(LogConfig.configNames()), ConfigResource.Type.BROKER, ignore -> true, + ConfigResource.Type.CONTROLLER, ignore -> true, ConfigResource.Type.CLIENT_METRICS, new SetContainsPredicate(ClientMetricsConfigs.configNames()), ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.configNames()) ); diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java index 2a93bcdc55789..58731ebd5b1ae 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java @@ -92,7 +92,7 @@ import joptsimple.OptionSpec; /** - * This script can be used to change configs for topics/clients/users/brokers/ips/client-metrics/groups dynamically + * This script can be used to change configs for topics/clients/users/brokers/controllers/ips/client-metrics/groups dynamically * An entity described or altered by the command may be one of: *
    *
  • topic: {@code --topic } OR {@code --entity-type topics --entity-name } @@ -101,15 +101,16 @@ *
  • {@code }: {@code --user --client } OR * {@code --entity-type users --entity-name --entity-type clients --entity-name } *
  • broker: {@code --broker } OR {@code --entity-type brokers --entity-name } + *
  • controller: {@code --entity-type controllers --entity-name } *
  • broker-logger: {@code --broker-logger } OR {@code --entity-type broker-loggers --entity-name } *
  • ip: {@code --ip } OR {@code --entity-type ips --entity-name } *
  • client-metrics: {@code --client-metrics } OR {@code --entity-type client-metrics --entity-name } *
  • group: {@code --group } OR {@code --entity-type groups --entity-name } *
- * {@code --entity-type --entity-default} may be specified in place of {@code --entity-type --entity-name } - * when describing or altering default configuration for users, clients, brokers, or ips, respectively. - * Alternatively, {@code --user-defaults}, {@code --client-defaults}, {@code --broker-defaults}, or {@code --ip-defaults} may be specified in place of - * {@code --entity-type --entity-default}, respectively. + * {@code --entity-type --entity-default} may be specified in place of {@code --entity-type --entity-name } + * when describing or altering default configuration for users, clients, brokers, controllers, or ips, respectively. + * Alternatively, {@code --user-defaults}, {@code --client-defaults}, {@code --broker-defaults}, {@code --controller-defaults}, or {@code --ip-defaults} may be specified in place of + * {@code --entity-type --entity-default}, respectively. */ public class ConfigCommand { @@ -120,6 +121,7 @@ public class ConfigCommand { private static final String TOPIC_TYPE = ConfigType.TOPIC.value(); private static final String CLIENT_METRICS_TYPE = ConfigType.CLIENT_METRICS.value(); private static final String BROKER_TYPE = ConfigType.BROKER.value(); + private static final String CONTROLLER_TYPE = ConfigType.CONTROLLER.value(); private static final String GROUP_TYPE = ConfigType.GROUP.value(); private static final String USER_TYPE = ConfigType.USER.value(); private static final String CLIENT_TYPE = ConfigType.CLIENT.value(); @@ -241,7 +243,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE List configsToBeDeleted = parseConfigsToBeDeleted(opts); if (TOPIC_TYPE.equals(entityType) || CLIENT_METRICS_TYPE.equals(entityType) || - BROKER_TYPE.equals(entityType) || GROUP_TYPE.equals(entityType)) { + BROKER_TYPE.equals(entityType) || CONTROLLER_TYPE.equals(entityType) || GROUP_TYPE.equals(entityType)) { ConfigResource.Type configResourceType; if (TOPIC_TYPE.equals(entityType)) { configResourceType = ConfigResource.Type.TOPIC; @@ -252,6 +254,11 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE validateBrokerId(entityName, entityType); } configResourceType = ConfigResource.Type.BROKER; + } else if (CONTROLLER_TYPE.equals(entityType)) { + if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) { + validateBrokerId(entityName, entityType); + } + configResourceType = ConfigResource.Type.CONTROLLER; } else { configResourceType = ConfigResource.Type.GROUP; } @@ -475,7 +482,8 @@ static void describeConfig(Admin adminClient, ConfigCommandOptions opts) throws boolean describeAll = opts.options.has(opts.allOpt); String entityType = entityTypes.get(0); - if (TOPIC_TYPE.equals(entityType) || BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType) || + if (TOPIC_TYPE.equals(entityType) || BROKER_TYPE.equals(entityType) || CONTROLLER_TYPE.equals(entityType) || + BROKER_LOGGER_CONFIG_TYPE.equals(entityType) || CLIENT_METRICS_TYPE.equals(entityType) || GROUP_TYPE.equals(entityType)) { describeResourceConfig(adminClient, entityType, entityNames.isEmpty() ? Optional.empty() : Optional.of(entityNames.get(0)), describeAll); } else if (USER_TYPE.equals(entityType) || CLIENT_TYPE.equals(entityType)) { @@ -498,11 +506,11 @@ private static void describeResourceConfig(Admin adminClient, String entityType, System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config."); return; } - } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) { + } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType) || CONTROLLER_TYPE.equals(entityType)) { if (adminClient.describeCluster().nodes().get().stream().anyMatch(n -> n.idString().equals(name))) { - // valid broker id + // valid broker/controller id } else if (BROKER_DEFAULT_ENTITY_NAME.equals(name)) { - // default broker configs + // default broker/controller configs } else { System.out.println("The " + entityTypeSingular + " '" + name + "' doesn't exist and doesn't have dynamic config."); return; @@ -535,7 +543,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType, } else { if (TOPIC_TYPE.equals(entityType)) { entities = new LinkedHashSet<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get()); - } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) { + } else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType) || CONTROLLER_TYPE.equals(entityType)) { Set brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream() .map(Node::idString) .collect(Collectors.toCollection(LinkedHashSet::new)); @@ -640,6 +648,14 @@ private static DescribeConfigContext describeConfigContext(String entityType, St validateBrokerId(entityName, entityType); dynamicConfigSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG; } + } else if (CONTROLLER_TYPE.equals(entityType)) { + configResourceType = ConfigResource.Type.CONTROLLER; + if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) { + dynamicConfigSource = ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CONTROLLER_CONFIG; + } else { + validateBrokerId(entityName, entityType); + dynamicConfigSource = ConfigEntry.ConfigSource.DYNAMIC_CONTROLLER_CONFIG; + } } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) { if (!entityName.isEmpty()) { validateBrokerId(entityName, entityType);