Skip to content

Commit

Permalink
KAFKA-18491 Remove zkClient & maybeUpdateMetadataCache from ReplicaMa…
Browse files Browse the repository at this point in the history
…nager (apache#18507)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
peterxcli authored Jan 13, 2025
1 parent d8c98f2 commit da0c3be
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.share.DelayedShareFetch;
import kafka.zk.KafkaZkClient;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -64,7 +63,6 @@ public class ReplicaManagerBuilder {
private BrokerTopicStats brokerTopicStats = null;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
private Optional<KafkaZkClient> zkClient = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
Expand Down Expand Up @@ -137,11 +135,6 @@ public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean isShuttingDown) {
return this;
}

public ReplicaManagerBuilder setZkClient(KafkaZkClient zkClient) {
this.zkClient = Optional.of(zkClient);
return this;
}

public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> delayedProducePurgatory) {
this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
return this;
Expand Down Expand Up @@ -210,7 +203,6 @@ public ReplicaManager build() {
alterPartitionManager,
brokerTopicStats,
isShuttingDown,
OptionConverters.toScala(zkClient),
OptionConverters.toScala(delayedProducePurgatory),
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ class BrokerServer(
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
Expand Down
32 changes: 4 additions & 28 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache
import kafka.server.share.DelayedShareFetch
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
Expand Down Expand Up @@ -277,7 +275,6 @@ class ReplicaManager(val config: KafkaConfig,
val alterPartitionManager: AlterPartitionManager,
val brokerTopicStats: BrokerTopicStats = new BrokerTopicStats(),
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
val zkClient: Option[KafkaZkClient] = None,
delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None,
delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
Expand Down Expand Up @@ -2037,23 +2034,6 @@ class ReplicaManager(val config: KafkaConfig,

def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config)

def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
if (updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = s"Received update metadata request with correlation id $correlationId " +
s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. " +
s"Latest known controller epoch is $controllerEpoch"
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
} else {
val zkMetadataCache = metadataCache.asInstanceOf[ZkMetadataCache]
val deletedPartitions = zkMetadataCache.updateMetadata(correlationId, updateMetadataRequest)
controllerEpoch = updateMetadataRequest.controllerEpoch
deletedPartitions
}
}
}

def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
Expand Down Expand Up @@ -2652,15 +2632,11 @@ class ReplicaManager(val config: KafkaConfig,
}

if (notifyController) {
if (zkClient.isEmpty) {
if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get)
} else {
fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID")
Exit.halt(1)
}
if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get)
} else {
zkClient.get.propagateLogDirEvent(localBrokerId)
fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID")
Exit.halt(1)
}
}
warn(s"Stopped serving replicas in dir $dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
Expand Down Expand Up @@ -69,7 +69,7 @@ class LocalLeaderEndPointTest extends Logging {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)
val partition = replicaManager.createPartition(topicPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.cluster.Partition
import kafka.server.metadata.MockConfigRepository
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChannel}

Expand Down Expand Up @@ -69,7 +70,7 @@ class HighwatermarkPersistenceTest {
scheduler = scheduler,
logManager = logManagers.head,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = logDirFailureChannels.head,
alterPartitionManager = alterIsrManager)
replicaManager.startup()
Expand Down Expand Up @@ -127,7 +128,7 @@ class HighwatermarkPersistenceTest {
scheduler = scheduler,
logManager = logManagers.head,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = logDirFailureChannels.head,
alterPartitionManager = alterIsrManager)
replicaManager.startup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogOffsetMetadata}
Expand Down Expand Up @@ -72,7 +73,7 @@ class IsrExpirationTest {
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size),
alterPartitionManager = alterIsrManager)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot}
Expand Down Expand Up @@ -307,7 +308,7 @@ class ReplicaManagerQuotasTest {
scheduler = scheduler,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId, configs.head.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(leaderBrokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size),
alterPartitionManager = alterIsrManager)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -72,7 +72,7 @@ class OffsetsForLeaderEpochTest {
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterIsrManager)
val partition = replicaManager.createPartition(tp)
Expand Down Expand Up @@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest {
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterIsrManager)
replicaManager.createPartition(tp)
Expand Down Expand Up @@ -132,7 +132,7 @@ class OffsetsForLeaderEpochTest {
scheduler = null,
logManager = logManager,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterIsrManager)

Expand Down

0 comments on commit da0c3be

Please sign in to comment.