From d39d3cc978f03dc4f1a69a9ce1084c9f0f263446 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 1 Jul 2026 17:28:08 +0200 Subject: [PATCH] KAFKA-17840: Move QuotaFactory to server module --- .../server/builders/KafkaApisBuilder.java | 2 +- .../builders/ReplicaManagerBuilder.java | 2 +- .../server/share/ReplicaManagerLogReader.java | 2 +- .../scala/kafka/server/BrokerServer.scala | 1 + .../scala/kafka/server/ConfigHandler.scala | 2 +- .../scala/kafka/server/ControllerApis.scala | 2 +- .../scala/kafka/server/ControllerServer.scala | 3 ++- .../kafka/server/DynamicBrokerConfig.scala | 1 + .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../main/scala/kafka/server/KafkaBroker.scala | 1 + .../main/scala/kafka/server/KafkaConfig.scala | 3 --- .../kafka/server/LocalLeaderEndPoint.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/server/RequestHandlerHelper.scala | 2 +- .../metadata/ClientQuotaMetadataManager.scala | 2 +- .../server/share/DelayedShareFetchTest.java | 2 +- .../server/LocalLeaderEndPointTest.scala | 3 ++- .../server/RemoteLeaderEndPointTest.scala | 1 + .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../kafka/server/ControllerApisTest.scala | 2 +- .../server/DynamicBrokerConfigTest.scala | 1 + .../server/HighwatermarkPersistenceTest.scala | 1 + .../unit/kafka/server/IsrExpirationTest.scala | 3 ++- .../unit/kafka/server/KafkaApisTest.scala | 2 +- .../ReplicaAlterLogDirsThreadTest.scala | 2 +- .../server/ReplicaFetcherThreadTest.scala | 2 +- .../ReplicaManagerConcurrencyTest.scala | 3 ++- .../server/ReplicaManagerQuotasTest.scala | 3 ++- .../kafka/server/ReplicaManagerTest.scala | 3 ++- .../epoch/OffsetsForLeaderEpochTest.scala | 3 ++- .../ReplicaFetcherThreadBenchmark.java | 2 +- .../KRaftMetadataRequestBenchmark.java | 2 +- .../kafka/jmh/server/CheckpointBench.java | 2 +- .../jmh/server/PartitionCreationBench.java | 2 +- .../server/config/AbstractKafkaConfig.java | 9 +++++++ .../kafka/server/quota}/QuotaFactory.java | 26 +++++++------------ 36 files changed, 59 insertions(+), 46 deletions(-) rename {core/src/main/java/kafka/server => server/src/main/java/org/apache/kafka/server/quota}/QuotaFactory.java (89%) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 0b31094ec6bf7..01ef28c542f9c 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -21,7 +21,6 @@ import kafka.network.RequestChannel; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; -import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; import kafka.server.share.SharePartitionManager; @@ -40,6 +39,7 @@ import org.apache.kafka.server.FetchManager; import org.apache.kafka.server.ForwardingManager; import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import java.util.Map; diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 561447d9f974b..011af858b968c 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -18,7 +18,6 @@ package kafka.server.builders; import kafka.server.KafkaConfig; -import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; import org.apache.kafka.common.metrics.Metrics; @@ -27,6 +26,7 @@ import org.apache.kafka.server.DelayedActionQueue; import org.apache.kafka.server.common.DirectoryEventHandler; import org.apache.kafka.server.partition.AlterPartitionManager; +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogManager; diff --git a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java index acebdb6a735aa..844ef85a25dc8 100644 --- a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java +++ b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java @@ -16,13 +16,13 @@ */ package kafka.server.share; -import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.log.remote.storage.RemoteLogManager; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.share.LogReader; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.storage.internals.log.FetchDataInfo; diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index d7502aaa039e3..ae4503042b306 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -47,6 +47,7 @@ import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublish import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.FetchSession.FetchSessionCache import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, ShareVersion, TopicIdPartition} import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 0fc594c2538c4..a61dd7d2fdbe3 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.{Collections, Properties} -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.utils.Logging import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.common.metrics.Quota._ diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 9f56906f8223c..081387d6bbbe4 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -24,7 +24,7 @@ import java.util.Map.Entry import java.util.concurrent.CompletableFuture import java.util.function.Consumer import kafka.network.RequestChannel -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server.logger.RuntimeLoggerManager import kafka.utils.Logging import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType} diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 56f036f090f0f..c2906e4c6745e 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -19,7 +19,8 @@ package kafka.server import kafka.network.SocketServer import kafka.raft.KafkaRaftManager -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher} import scala.collection.immutable diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index b407c2e55ba40..82081d5ec6c7e 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -42,6 +42,7 @@ import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandl import org.apache.kafka.server.config.{BrokerReconfigurable => JBrokerReconfigurable, DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock} import org.apache.kafka.snapshot.RecordsSnapshotReader diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 36948c34fb879..7383ef3436c96 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.coordinator.transaction.TransactionCoordinator import kafka.network.RequestChannel -import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} +import org.apache.kafka.server.quota.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} import kafka.server.handlers.DescribeTopicPartitionsRequestHandler import kafka.server.share.SharePartitionManager import kafka.utils.Logging diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 55bf1323029c8..3cd70166e210b 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.metadata.{BrokerState, MetadataCache} +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.BrokerLifecycleManager import org.apache.kafka.server.authorizer.Authorizer diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 94b86d83d1f81..03f2c46a89095 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -159,9 +159,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this) def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig - private val _quotaConfig = new QuotaConfig(this) - def quotaConfig: QuotaConfig = _quotaConfig - /** ********* General Configuration ***********/ val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG) val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala index bd1f230a6ddb3..9f50aa745c1fd 100644 --- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.server.QuotaFactory.UNBOUNDED_QUOTA +import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA import kafka.utils.Logging import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b109d8d411b7c..3a5c0fd47f621 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -18,7 +18,7 @@ package kafka.server import com.yammer.metrics.core.Meter import kafka.cluster.Partition -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.share.DelayedShareFetch import kafka.utils._ diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala index 29ee72e025b12..164c7c7f6aab5 100644 --- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala +++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.network.RequestChannel -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} import org.apache.kafka.common.utils.Time diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala index 079d257ca475f..fe36172195ed8 100644 --- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala +++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala @@ -18,7 +18,7 @@ package kafka.server.metadata import kafka.network.ConnectionQuotas -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity import kafka.utils.Logging import org.apache.kafka.common.metrics.Quota diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index ba50773b4ac3b..3bd39656cb2a1 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -17,7 +17,6 @@ package kafka.server.share; import kafka.cluster.Partition; -import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import org.apache.kafka.clients.consumer.internals.ShareAcquireMode; @@ -35,6 +34,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.quota.ReplicaQuota; import org.apache.kafka.server.share.LogReader; import org.apache.kafka.server.share.PartitionMetadataProvider; diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index 71f331000940b..99facff1f44b3 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -17,7 +17,8 @@ package kafka.server -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.utils.{Logging, TestUtils} import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala index 32150d453b2b2..de0971b8c526e 100644 --- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, ReplicaState} diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index abb3f6d4601e8..dd3cabe92ba18 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -23,7 +23,7 @@ import java.util.{Collections, Random} import java.util.concurrent.atomic.AtomicInteger import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.cluster.Partition -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server._ import kafka.utils._ import org.apache.kafka.common.{TopicIdPartition, TopicPartition} diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index da1d4d2112a36..6a11bbfbe8ea9 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.network.RequestChannel -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import org.apache.kafka.clients.admin.AlterConfigOp import org.apache.kafka.common.Uuid.ZERO_UUID import org.apache.kafka.common.acl.AclOperation diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 96d865f8531f6..08c1e6737f879 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -40,6 +40,7 @@ import org.apache.kafka.server.common.DirectoryEventHandler import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs} +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver} import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogManager, ProducerStateManagerConfig} diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index eba5042f0a746..2a8c6544ce423 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -18,6 +18,7 @@ package kafka.server import java.io.File import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.server.quota.QuotaFactory import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api._ diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index 2c1a5fdb5de21..e4bda1164034d 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -19,7 +19,8 @@ package kafka.server import java.util import java.util.Properties import kafka.cluster.Partition -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index f5d838edf8324..80627e3ef2ac9 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Histogram, Meter} import kafka.cluster.Partition import kafka.coordinator.transaction.TransactionCoordinator import kafka.network.RequestChannel -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server.share.SharePartitionManager import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.admin.AlterConfigOp.OpType diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index ab5704601420f..f2dddbf13867f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.cluster.Partition -import kafka.server.QuotaFactory.UNBOUNDED_QUOTA +import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState import kafka.utils.TestUtils import org.apache.kafka.common.errors.KafkaStorageException diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 736d71f631ab8..dec68a290c091 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.cluster.Partition -import kafka.server.QuotaFactory.UNBOUNDED_QUOTA +import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA import kafka.server.epoch.util.MockBlockingSender import kafka.utils.TestUtils import org.apache.kafka.clients.FetchSessionHandler diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index 0ed1a999d715e..40c6df611a9fb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -22,7 +22,8 @@ import java.net.InetAddress import java.util import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue, TimeUnit} import java.util.{Optional, Properties} -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.{Logging, TestUtils} import org.apache.kafka.common diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index fc4cfec1466f3..ad7b8e6b2c6fa 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -18,7 +18,8 @@ package kafka.server import java.util.{Collections, Optional, Properties} import kafka.cluster.{Partition, PartitionTest} -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.utils._ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.metrics.Metrics diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index aa28437835902..31e836123bffe 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -22,7 +22,8 @@ import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.Partition import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics -import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} import kafka.server.epoch.util.MockBlockingSender import kafka.server.share.{DelayedShareFetch, SharePartition} import kafka.utils.TestUtils.waitUntilTrue diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index a06aff1eed7a5..f60e4c6d85c32 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -16,7 +16,8 @@ */ package kafka.server.epoch -import kafka.server.QuotaFactory.QuotaManagers +import org.apache.kafka.server.quota.QuotaFactory +import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers import kafka.server._ import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index a2e55b905b8c3..2f39e92ccd377 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -23,7 +23,6 @@ import kafka.server.InitialFetchState; import kafka.server.KafkaConfig; import kafka.server.OffsetTruncationState; -import kafka.server.QuotaFactory; import kafka.server.RemoteLeaderEndPoint; import kafka.server.ReplicaFetcherThread; import kafka.server.ReplicaManager; @@ -57,6 +56,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.network.BrokerEndPoint; import org.apache.kafka.server.partition.AlterPartitionManager; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.quota.ReplicaQuota; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.MockTime; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 0dad41f0f872a..7f6ee183e99f9 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -21,7 +21,6 @@ import kafka.network.RequestChannel; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; -import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.builders.KafkaApisBuilder; import kafka.server.share.SharePartitionManager; @@ -65,6 +64,7 @@ import org.apache.kafka.server.quota.ClientQuotaManager; import org.apache.kafka.server.quota.ClientRequestQuotaManager; import org.apache.kafka.server.quota.ControllerMutationQuotaManager; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.quota.ReplicationQuotaManager; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 480ce85548123..891c5daf45ad8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -18,7 +18,6 @@ import kafka.cluster.Partition; import kafka.server.KafkaConfig; -import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder; @@ -33,6 +32,7 @@ import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.metadata.MockConfigRepository; import org.apache.kafka.server.partition.AlterPartitionManager; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.Scheduler; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 5d1ac018bd705..1cb47774b83ef 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -18,7 +18,6 @@ import kafka.cluster.Partition; import kafka.server.KafkaConfig; -import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder; @@ -36,6 +35,7 @@ import org.apache.kafka.metadata.MockConfigRepository; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.partition.AlterPartitionManager; +import org.apache.kafka.server.quota.QuotaFactory; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 27a9750756355..d60543b2edda1 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -89,10 +89,19 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { AddPartitionsToTxnConfig.CONFIG_DEF )); + private volatile QuotaConfig quotaConfig; + public AbstractKafkaConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); } + public QuotaConfig quotaConfig() { + if (quotaConfig == null) { + quotaConfig = new QuotaConfig(this); + } + return quotaConfig; + } + public List logDirs() { return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getList(ServerLogConfigs.LOG_DIR_CONFIG)); } diff --git a/core/src/main/java/kafka/server/QuotaFactory.java b/server/src/main/java/org/apache/kafka/server/quota/QuotaFactory.java similarity index 89% rename from core/src/main/java/kafka/server/QuotaFactory.java rename to server/src/main/java/org/apache/kafka/server/quota/QuotaFactory.java index aa28856cdbbff..952250887a1f3 100644 --- a/core/src/main/java/kafka/server/QuotaFactory.java +++ b/server/src/main/java/org/apache/kafka/server/quota/QuotaFactory.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server; +package org.apache.kafka.server.quota; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Plugin; @@ -22,16 +22,10 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.publisher.QuotaConfigChangeListener; +import org.apache.kafka.server.config.AbstractKafkaConfig; import org.apache.kafka.server.config.ClientQuotaManagerConfig; import org.apache.kafka.server.config.QuotaConfig; import org.apache.kafka.server.config.ReplicationQuotaManagerConfig; -import org.apache.kafka.server.quota.ClientQuotaCallback; -import org.apache.kafka.server.quota.ClientQuotaManager; -import org.apache.kafka.server.quota.ClientRequestQuotaManager; -import org.apache.kafka.server.quota.ControllerMutationQuotaManager; -import org.apache.kafka.server.quota.QuotaType; -import org.apache.kafka.server.quota.ReplicaQuota; -import org.apache.kafka.server.quota.ReplicationQuotaManager; import java.util.Optional; @@ -83,7 +77,7 @@ public QuotaConfigChangeListener quotaConfigChangeListener() { } public static QuotaManagers instantiate( - KafkaConfig cfg, + AbstractKafkaConfig cfg, Metrics metrics, Time time, String threadNamePrefix, @@ -104,8 +98,8 @@ public static QuotaManagers instantiate( } private static Optional> createClientQuotaCallback( - KafkaConfig cfg, - Metrics metrics, + AbstractKafkaConfig cfg, + Metrics metrics, String role ) { ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance( @@ -118,31 +112,31 @@ private static Optional> createClientQuotaCallback( )); } - private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) { + private static ClientQuotaManagerConfig clientConfig(AbstractKafkaConfig cfg) { return new ClientQuotaManagerConfig( cfg.quotaConfig().numQuotaSamples(), cfg.quotaConfig().quotaWindowSizeSeconds() ); } - private static ClientQuotaManagerConfig clientControllerMutationConfig(KafkaConfig cfg) { + private static ClientQuotaManagerConfig clientControllerMutationConfig(AbstractKafkaConfig cfg) { return new ClientQuotaManagerConfig( cfg.quotaConfig().numControllerQuotaSamples(), cfg.quotaConfig().controllerQuotaWindowSizeSeconds() ); } - private static ReplicationQuotaManagerConfig replicationConfig(KafkaConfig cfg) { + private static ReplicationQuotaManagerConfig replicationConfig(AbstractKafkaConfig cfg) { return new ReplicationQuotaManagerConfig( cfg.quotaConfig().numReplicationQuotaSamples(), cfg.quotaConfig().replicationQuotaWindowSizeSeconds() ); } - private static ReplicationQuotaManagerConfig alterLogDirsReplicationConfig(KafkaConfig cfg) { + private static ReplicationQuotaManagerConfig alterLogDirsReplicationConfig(AbstractKafkaConfig cfg) { return new ReplicationQuotaManagerConfig( cfg.quotaConfig().numAlterLogDirsReplicationQuotaSamples(), cfg.quotaConfig().alterLogDirsReplicationQuotaWindowSizeSeconds() ); } -} +} \ No newline at end of file