Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17565: Move MetadataCache interface to metadata module #18801

Open
wants to merge 20 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
</subpackage>

<subpackage name="coordinator">
<allow class="kafka.server.MetadataCache" />
<allow class="org.apache.kafka.metadata.MetadataCache" />
</subpackage>

<subpackage name="docker">
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,15 @@
</subpackage>

<subpackage name="metadata">
<allow pkg="org.apache.kafka.admin" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;
Expand All @@ -36,6 +35,7 @@
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.util.Scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
Expand All @@ -31,15 +30,14 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.metadata.MetadataCache;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import scala.jdk.javaapi.CollectionConverters;

import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;

Expand All @@ -65,7 +63,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
String cursorTopicName = cursor != null ? cursor.topicName() : "";
if (fetchAllTopics) {
CollectionConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName -> {
metadataCache.getAllTopics().forEach(topicName -> {
if (topicName.compareTo(cursorTopicName) >= 0) {
topics.add(topicName);
}
Expand Down Expand Up @@ -105,7 +103,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
});

DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
CollectionConverters.asScala(authorizedTopicsStream.iterator()),
authorizedTopicsStream.iterator(),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package kafka.server.share;

import kafka.server.MetadataCache;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;

Expand All @@ -37,9 +36,6 @@
import java.util.Set;
import java.util.function.Function;

import scala.jdk.javaapi.CollectionConverters;
import scala.jdk.javaapi.OptionConverters;

public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper {
private final MetadataCache metadataCache;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
Expand Down Expand Up @@ -73,13 +69,11 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);

List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
metadataCache.getTopicMetadata(
CollectionConverters.asScala(topicSet),
interBrokerListenerName,
false,
false
)
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = metadataCache.getTopicMetadata(
topicSet,
interBrokerListenerName,
false,
false
);

if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
Expand All @@ -92,7 +86,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
.findFirst();

if (response.isPresent()) {
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
return metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName)
.orElse(Node.noNode());
} else {
return Node.noNode();
Expand All @@ -108,7 +102,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
@Override
public List<Node> getClusterNodes() {
try {
return CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
return metadataCache.getAliveBrokerNodes(interBrokerListenerName);
} catch (Exception e) {
log.warn("Exception while getting cluster nodes", e);
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -1069,9 +1069,9 @@ class Partition(val topicPartition: TopicPartition,
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}

private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = {
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isPresent &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get)
}

/*
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package kafka.cluster

import kafka.server.MetadataCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}

import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
replicaState.updateAndGet { currentReplicaState =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && cachedBrokerEpoch.get > brokerEpoch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify to filter + isPresent

throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${currentReplicaState.brokerEpoch.get}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package kafka.coordinator.transaction

import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
Expand All @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR

import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
import kafka.server.{KafkaConfig, MetadataCache}
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
Expand All @@ -32,12 +32,14 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}

import scala.collection.{concurrent, immutable}
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

object TransactionMarkerChannelManager {
private val UnknownDestinationQueueSizeMetricName = "UnknownDestinationQueueSize"
Expand Down Expand Up @@ -382,7 +384,7 @@ class TransactionMarkerChannelManager(
topicPartitions: immutable.Set[TopicPartition]): Unit = {
val txnTopicPartition = txnStateManager.partitionFor(pendingCompleteTxn.transactionalId)
val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
OptionConverters.toScala(metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName))
}

val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{MetadataCache, ReplicaManager}
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.config.TopicConfig
Expand All @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartiti
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, MetadataResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}

import java.util
import java.util.concurrent.TimeUnit
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

object AddPartitionsToTxnManager {
type AppendCallback = Map[TopicPartition, Errors] => Unit
Expand Down Expand Up @@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
}

private def getTransactionCoordinator(partition: Int): Option[Node] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use Java optional as it has only one usage

metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition))
.filter(_.leader != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))
.flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better if we keep the Java Optional and convert it at the end.

}

private def topicPartitionsToError(transactionData: AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
import org.apache.kafka.server.common.FinalizedFeatures
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class BrokerServer(

logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ControllerServer(
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DelayedElectLeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.server.purgatory.DelayedOperation

import scala.collection.{Map, mutable}
import scala.jdk.javaapi.OptionConverters

/** A delayed elect leader operation that can be created by the replica manager and watched
* in the elect leader purgatory
Expand Down Expand Up @@ -74,7 +75,7 @@ class DelayedElectLeader(
private def updateWaiting(): Unit = {
val metadataCache = replicaManager.metadataCache
val completedPartitions = waitingPartitions.collect {
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp
case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exists can be done in Java with filter + isPresent

}
completedPartitions.foreach { tp =>
waitingPartitions -= tp
Expand Down
Loading