-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17565: Move MetadataCache interface to metadata module #18801
base: trunk
Are you sure you want to change the base?
Conversation
_currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()). | ||
flatMap(_.node(listenerName.value()).toScala).toSeq | ||
flatMap(_.node(listenerName.value()).toScala).toList.asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that toList
converts to an immutable linked list - you do not want to do that if you are then calling asJava
. toBuffer
is probably a better option.
} | ||
|
||
override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = { | ||
override def getAliveBrokerNodes(listenerName: ListenerName): util.List[Node] = { | ||
_currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove asScala
from here and use stream
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I will update it tomorrow.
I realize your PR is still wip, so feel free to ignore the two comments if you were still working through that code. :) |
fa0cd25
to
a3e92ef
Compare
6a2dc82
to
e0c881a
Compare
e0c881a
to
82f5cf6
Compare
@FrankYang0529 please fix the conflicts, thanks |
82f5cf6
to
f10d680
Compare
Signed-off-by: PoAn Yang <[email protected]>
f10d680
to
338f362
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529, thanks for this patch! I'm glad to see MetadataCache moving to Java :)
I left several minor comments inline. One overall comment is that I think it would be best to include the parens for no-arg function calls in the Scala code (only for Scala code already included/modified in this PR)
Since MetadataCache is in the hot path for many requests, it would make sense to run our benchmarks before/after this patch to make sure we don't introduce a performance regression.
} else { | ||
broker.metadataCache.numPartitions(topic) == Some(expectedNumPartitions) | ||
broker.metadataCache.numPartitions(topic).isPresent && broker.metadataCache.numPartitions(topic).get == expectedNumPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified to use filter
or maybe orElse(null) =
flatMap(_.node(listenerName.value()).toScala) | ||
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): util.Optional[Node] = { | ||
util.Optional.ofNullable(_currentImage.cluster.broker(brokerId)) | ||
.filter(broker => !broker.fenced) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With Java 11 we can now use Predicate.not
for negating filters with lambda. This can be
.filter(Predicate.not(_.fenced))
} | ||
|
||
override def getTopicId(topicName: String): Uuid = _currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID) | ||
override def getTopicId(topicName: String): Uuid = util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName)) | ||
.map(topic => topic.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the method reference style here: .map(_.id)
val image = _currentImage | ||
topics.toSeq.flatMap { topic => | ||
topics.asScala.toSeq.flatMap { topic => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid this conversion to Scala and back to Java collection? Since topics
is now a Java set, it seems like we can use a Java stream here.
@@ -2383,7 +2385,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
() => { | |||
val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection() | |||
val describeClusterRequest = request.body[DescribeClusterRequest] | |||
metadataCache.getBrokerNodes(request.context.listenerName).foreach { node => | |||
metadataCache.getBrokerNodes(request.context.listenerName).asScala.foreach { node => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to convert to Scala for the foreach
here? Could we use the Java stream and its forEach
instead?
@@ -1193,8 +1195,8 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
val coordinatorEndpoint = topicMetadata.head.partitions.asScala | |||
.find(_.partitionIndex == partition) | |||
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID) | |||
.flatMap(metadata => metadataCache. | |||
getAliveBrokerNode(metadata.leaderId, request.context.listenerName)) | |||
.flatMap(metadata => OptionConverters.toScala(metadataCache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we let topicMetadata
stay as as a Java collection and convert this whole stream to a Java stream?
@@ -74,7 +75,7 @@ class DelayedElectLeader( | |||
private def updateWaiting(): Unit = { | |||
val metadataCache = replicaManager.metadataCache | |||
val completedPartitions = waitingPartitions.collect { | |||
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp | |||
case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exists
can be done in Java with filter
+ isPresent
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)) | ||
.filter(_.leader != MetadataResponse.NO_LEADER_ID) | ||
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)) | ||
.flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better if we keep the Java Optional and convert it at the end.
@@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat | |||
replicaState.updateAndGet { currentReplicaState => | |||
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId) | |||
// Fence the update if it provides a stale broker epoch. | |||
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { | |||
if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && cachedBrokerEpoch.get > brokerEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplify to filter
+ isPresent
Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch. a couple of comments are left. PTAL
image.cluster().brokers().values().asScala.filterNot(_.fenced()). | ||
map(b => new BrokerMetadata(b.id, b.rack)) | ||
private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = { | ||
_currentImage.cluster().brokers().values().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_currentImage
-> image
*/ | ||
Optional<Integer> numPartitions(String topic); | ||
|
||
Map<String, Uuid> topicNamesToIds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is useless - maybe we can remove it to simplify the interface.
|
||
Map<Uuid, String> topicIdsToNames(); | ||
|
||
Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, ListenerName listenerName); | ||
|
||
Cluster getClusterMetadata(String clusterId, ListenerName listenerName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val image = _currentImage | ||
val result = new mutable.HashMap[Int, Node]() | ||
val result = new mutable.HashMap[Integer, Node]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please use java Map instead?
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID)) | ||
.setIsInternal(Topic.isInternal(topic)) | ||
.setPartitions(partitionMetadata.toBuffer.asJava)) | ||
case None => util.stream.Stream.empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove the redundant space
@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager( | |||
} | |||
|
|||
private def getTransactionCoordinator(partition: Int): Option[Node] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use Java optional as it has only one usage
@@ -290,7 +290,7 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept | |||
TestUtils.waitForCondition( | |||
() -> brokers.stream().allMatch(broker -> partitions == 0 ? | |||
broker.metadataCache().numPartitions(topic).isEmpty() : | |||
broker.metadataCache().numPartitions(topic).contains(partitions) | |||
broker.metadataCache().numPartitions(topic).get() == partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broker.metadataCache().numPartitions(topic).filter(p -> p == partitions).isPresent()
maximumNumberOfPartitions: Int, | ||
ignoreTopicsWithExceptions: Boolean | ||
): DescribeTopicPartitionsResponseData = { | ||
val image = _currentImage | ||
var remaining = maximumNumberOfPartitions | ||
val result = new DescribeTopicPartitionsResponseData() | ||
breakable { | ||
topics.foreach { topicName => | ||
topics.asScala.foreach { topicName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forEachRemaining
Signed-off-by: PoAn Yang <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for your patch. I leave some comments to cleanup the interface
|
||
boolean hasAliveBroker(int brokerId); | ||
|
||
List<BrokerMetadata> getAliveBrokers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used by test infra only, and we can rewrite the check by hasAliveBroker
.
before
TestUtils.waitForCondition(() ->
brokers().values().stream().allMatch(brokerServer -> brokerServer.metadataCache().getAliveBrokers().size() == brokers.size()),
"Failed to wait for publisher to publish the metadata update to each broker.");
after
TestUtils.waitForCondition(() ->
brokers.values().stream().map(BrokerServer::metadataCache)
.allMatch(cache -> brokers.values().stream().map(b -> b.config().brokerId()).allMatch(cache::hasAliveBroker)),
"Failed to wait for publisher to publish the metadata update to each broker.");
|
||
Set<String> getAllTopics(); | ||
|
||
Set<TopicPartition> getTopicPartitions(String topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the usage of this method is to get the number of partitions. could you please rewrite the tests to use numPartitions
instead?
Signed-off-by: PoAn Yang <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
ReplicaFetcherThreadBenchmark
KRaftMetadataRequestBenchmark
PartitionMakeFollowerBenchmark
UpdateFollowerFetchStateBenchmark
CheckpointBench
PartitionCreationBench