Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import kafka.network.RequestChannel;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;

Expand All @@ -40,6 +39,7 @@
import org.apache.kafka.server.FetchManager;
import org.apache.kafka.server.ForwardingManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.server.builders;

import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.metrics.Metrics;
Expand All @@ -27,6 +26,7 @@
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package kafka.server.share;

import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.share.LogReader;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublish
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.FetchSession.FetchSessionCache
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, ShareVersion, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server

import java.util.{Collections, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.Quota._
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.Map.Entry
import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server.logger.RuntimeLoggerManager
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package kafka.server

import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher}

import scala.collection.immutable
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandl
import org.apache.kafka.server.config.{BrokerReconfigurable => JBrokerReconfigurable, DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import org.apache.kafka.server.quota.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.share.SharePartitionManager
import kafka.utils.Logging
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataCache}
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.BrokerLifecycleManager
import org.apache.kafka.server.authorizer.Authorizer
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig

private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig

/** ********* General Configuration ***********/
val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.server

import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA
import kafka.utils.Logging
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.server

import com.yammer.metrics.core.Meter
import kafka.cluster.Partition
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.share.DelayedShareFetch
import kafka.utils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server

import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
import org.apache.kafka.common.utils.Time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka.server.share;

import kafka.cluster.Partition;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;

import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
Expand All @@ -35,6 +34,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.share.LogReader;
import org.apache.kafka.server.share.PartitionMetadataProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package kafka.server

import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, ReplicaState}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.cluster.Partition
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server

import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver}
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogManager, ProducerStateManagerConfig}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kafka.server

import java.io.File
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package kafka.server
import java.util
import java.util.Properties
import kafka.cluster.Partition
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Histogram, Meter}
import kafka.cluster.Partition
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server.share.SharePartitionManager
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.server

import kafka.cluster.Partition
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.KafkaStorageException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.server

import kafka.cluster.Partition
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import org.apache.kafka.server.quota.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.TestUtils
import org.apache.kafka.clients.FetchSessionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.net.InetAddress
import java.util
import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue, TimeUnit}
import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package kafka.server

import java.util.{Collections, Optional, Properties}
import kafka.cluster.{Partition, PartitionTest}
import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.metrics.Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.Partition
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.epoch.util.MockBlockingSender
import kafka.server.share.{DelayedShareFetch, SharePartition}
import kafka.utils.TestUtils.waitUntilTrue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package kafka.server.epoch

import kafka.server.QuotaFactory.QuotaManagers
import org.apache.kafka.server.quota.QuotaFactory
import org.apache.kafka.server.quota.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.OffsetTruncationState;
import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import kafka.network.RequestChannel;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.share.SharePartitionManager;
Expand Down Expand Up @@ -65,6 +64,7 @@
import org.apache.kafka.server.quota.ClientQuotaManager;
import org.apache.kafka.server.quota.ClientRequestQuotaManager;
import org.apache.kafka.server.quota.ControllerMutationQuotaManager;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.quota.ReplicationQuotaManager;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import kafka.cluster.Partition;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
Expand All @@ -33,6 +32,7 @@
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import kafka.cluster.Partition;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
Expand All @@ -36,6 +35,7 @@
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.quota.QuotaFactory;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,19 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
AddPartitionsToTxnConfig.CONFIG_DEF
));

private volatile QuotaConfig quotaConfig;

public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
super(definition, originals, configProviderProps, doLog);
}

public QuotaConfig quotaConfig() {
if (quotaConfig == null) {
quotaConfig = new QuotaConfig(this);
}
return quotaConfig;
}

public List<String> logDirs() {
return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getList(ServerLogConfigs.LOG_DIR_CONFIG));
}
Expand Down
Loading
Loading