Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,17 @@ public enum ConfigType {
* Source of configuration entries.
*/
public enum ConfigSource {
DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients
DYNAMIC_GROUP_CONFIG, // dynamic group config that is configured for a specific group
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
DYNAMIC_CONTROLLER_CONFIG, // dynamic controller config that is configured for a specific controller
DYNAMIC_DEFAULT_CONTROLLER_CONFIG, // dynamic controller config that is configured as default for all controllers in the cluster
DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients
DYNAMIC_GROUP_CONFIG, // dynamic group config that is configured for a specific group
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class ConfigResource {
* Type of resource.
*/
public enum Type {
CONTROLLER((byte) 64),
GROUP((byte) 32),
CLIENT_METRICS((byte) 16),
BROKER_LOGGER((byte) 8),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ public enum ConfigSource {
DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG),
DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG),
CLIENT_METRICS_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG),
GROUP_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG);
GROUP_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
DYNAMIC_CONTROLLER_CONFIG((byte) 9, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CONTROLLER_CONFIG),
DYNAMIC_DEFAULT_CONTROLLER_CONFIG((byte) 10, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CONTROLLER_CONFIG);

final byte id;
private final org.apache.kafka.clients.admin.ConfigEntry.ConfigSource source;
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ class DataPlaneAcceptor(socketServer: SocketServer,
}
}

/**
* Returns the processors for this acceptor.
* Used by controller dynamic config to access channel builders.
*/
def getProcessors: Seq[Processor] = processors.toSeq

/**
* Reconfigures this instance with the given key-value pairs. The provided
* map contains all configs including any reconfigurable configs that
Expand Down Expand Up @@ -806,7 +812,7 @@ private[kafka] class Processor(
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
val listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
Expand Down Expand Up @@ -850,20 +856,20 @@ private[kafka] class Processor(
private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", JSocketServer.METRICS_GROUP, metricTags)
metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)

private[network] val selector = createSelector(
ChannelBuilders.serverChannelBuilder(
listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time,
logContext,
version => apiVersionManager.apiVersionResponse(0, version < 4)
)
private[kafka] val channelBuilder: ChannelBuilder = ChannelBuilders.serverChannelBuilder(
listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time,
logContext,
version => apiVersionManager.apiVersionResponse(0, version < 4)
)

private[network] val selector = createSelector(channelBuilder)

// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
channelBuilder match {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/raft/KafkaRaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.requests.RequestHeader
Expand Down Expand Up @@ -126,6 +126,7 @@ class KafkaRaftManager[T](
}

override val raftLog: RaftLog = buildMetadataLog()
private var raftClientChannelBuilder: ChannelBuilder = _
private val netChannel = buildNetworkChannel()
private val expirationTimer = new SystemTimer("raft-expiration-executor")
private val expirationService = new TimingWheelExpirationService(expirationTimer)
Expand Down Expand Up @@ -162,6 +163,8 @@ class KafkaRaftManager[T](
clientDriver.handleRequest(context, header, request, createdTimeMs)
}

def clientChannelBuilder: ChannelBuilder = raftClientChannelBuilder

private def buildRaftClient(): KafkaRaftClient[T] = {
new KafkaRaftClient(
OptionalInt.of(config.nodeId),
Expand Down Expand Up @@ -217,6 +220,7 @@ class KafkaRaftManager[T](
time,
logContext
)
raftClientChannelBuilder = channelBuilder

val metricGroupPrefix = "raft-channel"
val collectPerConnectionMetrics = false
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils._
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, CONTROLLER, GROUP, TOPIC}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
Expand Down Expand Up @@ -144,7 +144,7 @@ class ConfigAdminManager(nodeId: Int,
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
validateBrokerConfigChange(resource, configResource)
case TOPIC | CLIENT_METRICS | GROUP =>
case TOPIC | CLIENT_METRICS | GROUP | CONTROLLER =>
// Nothing to do.
case _ =>
throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}")
Expand Down Expand Up @@ -243,7 +243,7 @@ class ConfigAdminManager(nodeId: Int,
validateResourceNameIsCurrentNodeId(resource.resourceName())
}
validateBrokerConfigChange(resource, configResource)
case TOPIC | CLIENT_METRICS | GROUP =>
case TOPIC | CLIENT_METRICS | GROUP | CONTROLLER =>
// Nothing to do.
case _ =>
// Since legacy AlterConfigs does not support BROKER_LOGGER, any attempt to use it
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
val describeConfigsRequest = request.body(classOf[DescribeConfigsRequest])
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS =>
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
Expand All @@ -64,7 +64,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
val authorizedConfigs = describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
Expand Down Expand Up @@ -136,6 +136,29 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _))
}

case ConfigResource.Type.CONTROLLER =>
val controllerProps = configRepository.config(
new ConfigResource(ConfigResource.Type.CONTROLLER, resource.resourceName))
val isDefault = resource.resourceName == null || resource.resourceName.isEmpty
val sourceId =
if (isDefault) ConfigSource.DYNAMIC_DEFAULT_CONTROLLER_CONFIG.id
else ConfigSource.DYNAMIC_CONTROLLER_CONFIG.id
val entries = controllerProps.entrySet.asScala.map { e =>
val name = e.getKey.toString
val value = e.getValue.toString
val isSensitive = name.toLowerCase.contains("password")
new DescribeConfigsResponseData.DescribeConfigsResourceResult()
.setName(name)
.setValue(if (isSensitive) null else value)
.setConfigSource(sourceId)
.setIsSensitive(isSensitive)
.setReadOnly(false)
.setSynonyms(Collections.emptyList())
}.toList.asJava
new DescribeConfigsResponseData.DescribeConfigsResult()
.setErrorCode(Errors.NONE.code)
.setConfigs(entries)

case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class ControllerApis(
private def authorizeAlterResource(requestContext: RequestContext,
resource: ConfigResource): ApiError = {
resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS =>
case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS | ConfigResource.Type.CONTROLLER =>
if (authHelper.authorize(requestContext, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
new ApiError(NONE)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import java.util
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, CONTROLLER, GROUP, TOPIC}
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
Expand Down Expand Up @@ -113,6 +113,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
resource.`type`() match {
case TOPIC => validateTopicName(resource.name())
case BROKER => validateBrokerName(resource.name())
case CONTROLLER => validateBrokerName(resource.name())
case _ => throwExceptionForUnknownResourceType(resource)
}
}
Expand All @@ -129,6 +130,9 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
LogConfig.validate(oldConfigs, filteredConfigs, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CONTROLLER =>
validateBrokerName(resource.name())
filterAndValidateNullConfigs(newConfigs, "controller")
case CLIENT_METRICS =>
val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "client metrics")
ClientMetricsConfigs.validate(resource.name(), filteredConfigs)
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,47 @@ class ControllerServer(
),
"controller"))

// Set up the controller dynamic config manager for SSL reconfiguration.
val dynamicControllerConfig = new kafka.server.controller.DynamicControllerConfig(config)

import scala.jdk.CollectionConverters._
config.controllerListenerNames.asScala.foreach { listenerName =>
socketServer.dataPlaneAcceptor(listenerName).foreach { acceptor =>
acceptor.getProcessors.foreach { processor =>
processor.channelBuilder match {
case sslBuilder: org.apache.kafka.common.network.SslChannelBuilder =>
val wrapper = new kafka.server.controller.DynamicControllerSslListener(
sslBuilder,
processor.listenerName
)
dynamicControllerConfig.addReconfigurable(wrapper)
info(s"Registered SSL reconfigurable for controller inbound listener ${processor.listenerName}")
case _ =>
}
}
}
}

// Register the outbound raft client's SSL channel builder for SSL hot-reload.
Option(sharedServer.raftManager).foreach { raftManager =>
raftManager.clientChannelBuilder match {
case sslBuilder: org.apache.kafka.common.network.SslChannelBuilder =>
val controllerListenerName = new ListenerName(config.controllerListenerNames.get(0))
val wrapper = new kafka.server.controller.DynamicControllerSslListener(
sslBuilder,
controllerListenerName
)
dynamicControllerConfig.addReconfigurable(wrapper)
info(s"Registered SSL reconfigurable for raft outbound client (listener $controllerListenerName)")
case _ =>
}
}

metadataPublishers.add(new org.apache.kafka.image.publisher.ControllerDynamicConfigPublisher(
config.nodeId,
(configs: java.util.Map[String, String]) => dynamicControllerConfig.apply(configs)
))

// Register this instance for dynamic config changes to the KafkaConfig. This must be called
// after the authorizer and quotaManagers are initialized, since it references those objects.
// It must be called before DynamicClientQuotaPublisher is installed, since otherwise we may
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server.controller

import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.controller.util.ControllerListenerReconfigurable

import java.util.concurrent.CopyOnWriteArrayList
import scala.jdk.CollectionConverters._

/**
* Controller-side dynamic config manager.
* Owns the current effective config map, the registry of reconfigurables,
* and dispatches config updates on metadata replay.
*/
class DynamicControllerConfig(
config: KafkaConfig
) extends Logging {

private val reconfigurables = new CopyOnWriteArrayList[ControllerListenerReconfigurable]()
@volatile private var currentConfig: Map[String, String] = Map.empty

def addReconfigurable(reconfigurable: ControllerListenerReconfigurable): Unit = {
reconfigurables.add(reconfigurable)
}

def removeReconfigurable(reconfigurable: ControllerListenerReconfigurable): Unit = {
reconfigurables.remove(reconfigurable)
}

/**
* Apply a new effective config map from the metadata log.
* This is called by ControllerDynamicConfigPublisher on each metadata update.
*/
def apply(newConfig: java.util.Map[String, String]): Unit = {
val newConfigScala = newConfig.asScala.toMap

if (newConfigScala != currentConfig) {
info(s"Applying controller config update: ${newConfigScala.size} configs")

val iterator = reconfigurables.iterator()
while (iterator.hasNext) {
val reconfigurable = iterator.next()
try {
reconfigurable.validateReconfiguration(newConfig)
reconfigurable.reconfigure(newConfig)
} catch {
case e: Exception =>
error(s"Controller dynamic config update rejected for one reconfigurable; " +
s"previous configuration retained. Fix and re-alter.", e)
}
}
currentConfig = newConfigScala
}
}

def currentValue(key: String): Option[String] = {
currentConfig.get(key)
}
}
Loading