Skip to content

Commit

Permalink
KAFKA-17565: Move MetadataCache interface to metadata module
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Feb 9, 2025
1 parent 8b22f10 commit 82f5cf6
Show file tree
Hide file tree
Showing 64 changed files with 554 additions and 517 deletions.
2 changes: 1 addition & 1 deletion checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
</subpackage>

<subpackage name="coordinator">
<allow class="kafka.server.MetadataCache" />
<allow class="org.apache.kafka.metadata.MetadataCache" />
</subpackage>

<subpackage name="examples">
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@
</subpackage>

<subpackage name="metadata">
<allow pkg="org.apache.kafka.admin" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;
Expand All @@ -36,6 +35,7 @@
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.util.Scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
Expand All @@ -31,15 +30,14 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.metadata.MetadataCache;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import scala.jdk.javaapi.CollectionConverters;

import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;

Expand All @@ -65,7 +63,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
String cursorTopicName = cursor != null ? cursor.topicName() : "";
if (fetchAllTopics) {
CollectionConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName -> {
metadataCache.getAllTopics().forEach(topicName -> {
if (topicName.compareTo(cursorTopicName) >= 0) {
topics.add(topicName);
}
Expand Down Expand Up @@ -105,7 +103,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
});

DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
CollectionConverters.asScala(authorizedTopicsStream.iterator()),
authorizedTopicsStream.iterator(),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package kafka.server.share;

import kafka.server.MetadataCache;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;

Expand All @@ -37,9 +36,6 @@
import java.util.Set;
import java.util.function.Function;

import scala.jdk.javaapi.CollectionConverters;
import scala.jdk.javaapi.OptionConverters;

public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper {
private final MetadataCache metadataCache;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
Expand Down Expand Up @@ -73,13 +69,11 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);

List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
metadataCache.getTopicMetadata(
CollectionConverters.asScala(topicSet),
interBrokerListenerName,
false,
false
)
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = metadataCache.getTopicMetadata(
topicSet,
interBrokerListenerName,
false,
false
);

if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
Expand All @@ -92,7 +86,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
.findFirst();

if (response.isPresent()) {
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
return metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName)
.orElse(Node.noNode());
} else {
return Node.noNode();
Expand All @@ -108,7 +102,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
@Override
public List<Node> getClusterNodes() {
try {
return CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
return metadataCache.getAliveBrokerNodes(interBrokerListenerName);
} catch (Exception e) {
log.warn("Exception while getting cluster nodes", e);
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -1068,9 +1068,9 @@ class Partition(val topicPartition: TopicPartition,
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}

private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = {
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isPresent &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get)
}

/*
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package kafka.cluster

import kafka.log.UnifiedLog
import kafka.server.MetadataCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.storage.internals.log.LogOffsetMetadata

import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -114,7 +114,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
replicaState.updateAndGet { currentReplicaState =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && cachedBrokerEpoch.get > brokerEpoch) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${currentReplicaState.brokerEpoch.get}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package kafka.coordinator.transaction

import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
Expand All @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR

import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
import kafka.server.{KafkaConfig, MetadataCache}
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
Expand All @@ -32,12 +32,14 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}

import scala.collection.{concurrent, immutable}
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

object TransactionMarkerChannelManager {
private val UnknownDestinationQueueSizeMetricName = "UnknownDestinationQueueSize"
Expand Down Expand Up @@ -382,7 +384,7 @@ class TransactionMarkerChannelManager(
topicPartitions: immutable.Set[TopicPartition]): Unit = {
val txnTopicPartition = txnStateManager.partitionFor(pendingCompleteTxn.transactionalId)
val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
OptionConverters.toScala(metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName))
}

val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{MetadataCache, ReplicaManager}
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.config.TopicConfig
Expand All @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartiti
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, MetadataResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}

import java.util
import java.util.concurrent.TimeUnit
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

object AddPartitionsToTxnManager {
type AppendCallback = Map[TopicPartition, Errors] => Unit
Expand Down Expand Up @@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
}

private def getTransactionCoordinator(partition: Int): Option[Node] = {
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition))
.filter(_.leader != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))
.flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)))
}

private def topicPartitionsToError(transactionData: AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.util.Scheduler

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.common.FinalizedFeatures
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class BrokerServer(

logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ControllerServer(
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DelayedElectLeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.server.purgatory.DelayedOperation

import scala.collection.{Map, mutable}
import scala.jdk.javaapi.OptionConverters

/** A delayed elect leader operation that can be created by the replica manager and watched
* in the elect leader purgatory
Expand Down Expand Up @@ -74,7 +75,7 @@ class DelayedElectLeader(
private def updateWaiting(): Unit = {
val metadataCache = replicaManager.metadataCache
val completedPartitions = waitingPartitions.collect {
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp
case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp
}
completedPartitions.foreach { tp =>
waitingPartitions -= tp
Expand Down
Loading

0 comments on commit 82f5cf6

Please sign in to comment.