Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17565: Move MetadataCache interface to metadata module #18801

Open
wants to merge 20 commits into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Feb 4, 2025

  • ReplicaFetcherThreadBenchmark

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark
    
    • trunk
    Benchmark                                  (partitionCount)  Mode  Cnt       Score   Error  Units
    ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    2    4775.490          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher               500  avgt    2   25730.790          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    2   55334.206          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt    2  488427.547          ns/op
    
    • branch
    Benchmark                                  (partitionCount)  Mode  Cnt       Score   Error  Units
    ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    2    4825.219          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher               500  avgt    2   25985.662          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    2   56056.005          ns/op
    ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt    2  497138.573          ns/op
    
  • KRaftMetadataRequestBenchmark

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.metadata.KRaftMetadataRequestBenchmark
    
    • trunk
    Benchmark                                                      (partitionCount)  (topicCount)  Mode  Cnt         Score   Error  Units
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10           500  avgt    2    884933.558          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10          1000  avgt    2   1910054.621          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10          5000  avgt    2  21778869.337          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20           500  avgt    2   1537550.670          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20          1000  avgt    2   3168237.805          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20          5000  avgt    2  29699652.466          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50           500  avgt    2   3501483.852          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50          1000  avgt    2   7405481.182          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50          5000  avgt    2  55839670.124          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10           500  avgt    2       333.667          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10          1000  avgt    2       339.685          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10          5000  avgt    2       334.293          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20           500  avgt    2       329.899          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20          1000  avgt    2       347.537          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20          5000  avgt    2       332.781          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50           500  avgt    2       327.085          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50          1000  avgt    2       325.206          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50          5000  avgt    2       316.758          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10           500  avgt    2         7.569          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10          1000  avgt    2         7.565          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10          5000  avgt    2         7.574          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20           500  avgt    2         7.568          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20          1000  avgt    2         7.557          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20          5000  avgt    2         7.585          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50           500  avgt    2         7.560          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50          1000  avgt    2         7.554          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50          5000  avgt    2         7.574          ns/op
    
    • branch
    Benchmark                                                      (partitionCount)  (topicCount)  Mode  Cnt         Score   Error  Units
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10           500  avgt    2    910337.770          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10          1000  avgt    2   1902351.360          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                10          5000  avgt    2  22215893.338          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20           500  avgt    2   1572683.875          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20          1000  avgt    2   3188560.081          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                20          5000  avgt    2  29984751.632          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50           500  avgt    2   3413567.549          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50          1000  avgt    2   7303174.254          ns/op
    KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics                50          5000  avgt    2  54293721.640          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10           500  avgt    2       318.335          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10          1000  avgt    2       331.386          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              10          5000  avgt    2       332.944          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20           500  avgt    2       340.322          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20          1000  avgt    2       330.294          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              20          5000  avgt    2       342.154          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50           500  avgt    2       341.053          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50          1000  avgt    2       335.458          ns/op
    KRaftMetadataRequestBenchmark.testRequestToJson                              50          5000  avgt    2       322.050          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10           500  avgt    2         7.538          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10          1000  avgt    2         7.548          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                10          5000  avgt    2         7.545          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20           500  avgt    2         7.597          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20          1000  avgt    2         7.567          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                20          5000  avgt    2         7.558          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50           500  avgt    2         7.559          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50          1000  avgt    2         7.615          ns/op
    KRaftMetadataRequestBenchmark.testTopicIdInfo                                50          5000  avgt    2         7.562          ns/op
    
  • PartitionMakeFollowerBenchmark

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.partition.PartitionMakeFollowerBenchmark
    
    • trunk
    Benchmark                                        Mode  Cnt    Score   Error  Units
    PartitionMakeFollowerBenchmark.testMakeFollower  avgt    2  158.816          ns/op
    
    • branch
    Benchmark                                        Mode  Cnt    Score   Error  Units
    PartitionMakeFollowerBenchmark.testMakeFollower  avgt    2  160.533          ns/op
    
  • UpdateFollowerFetchStateBenchmark

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.partition.UpdateFollowerFetchStateBenchmark
    
    • trunk
    Benchmark                                                                Mode  Cnt     Score   Error  Units
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench          avgt    2  4975.261          ns/op
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange  avgt    2  4880.880          ns/op
    
    • branch
    Benchmark                                                                Mode  Cnt     Score   Error  Units
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench          avgt    2  5020.722          ns/op
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange  avgt    2  4878.855          ns/op
    
  • CheckpointBench

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.server.CheckpointBench
    
    • trunk
    Benchmark                                         (numPartitions)  (numTopics)   Mode  Cnt  Score   Error   Units
    CheckpointBench.measureCheckpointHighWatermarks                 3          100  thrpt    2  0.997          ops/ms
    CheckpointBench.measureCheckpointHighWatermarks                 3         1000  thrpt    2  0.703          ops/ms
    CheckpointBench.measureCheckpointHighWatermarks                 3         2000  thrpt    2  0.486          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3          100  thrpt    2  1.038          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3         1000  thrpt    2  0.734          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3         2000  thrpt    2  0.637          ops/ms
    
    • branch
    Benchmark                                         (numPartitions)  (numTopics)   Mode  Cnt  Score   Error   Units
    CheckpointBench.measureCheckpointHighWatermarks                 3          100  thrpt    2  0.990          ops/ms
    CheckpointBench.measureCheckpointHighWatermarks                 3         1000  thrpt    2  0.659          ops/ms
    CheckpointBench.measureCheckpointHighWatermarks                 3         2000  thrpt    2  0.508          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3          100  thrpt    2  0.923          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3         1000  thrpt    2  0.736          ops/ms
    CheckpointBench.measureCheckpointLogStartOffsets                3         2000  thrpt    2  0.637          ops/ms
    
  • PartitionCreationBench

    ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.server.PartitionCreationBench
    
    • trunk
    Benchmark                            (numPartitions)  (useTopicIds)  Mode  Cnt  Score   Error  Units
    PartitionCreationBench.makeFollower               20          false  avgt    2  5.997          ms/op
    PartitionCreationBench.makeFollower               20           true  avgt    2  6.961          ms/op
    
    • branch
    Benchmark                            (numPartitions)  (useTopicIds)  Mode  Cnt  Score   Error  Units
    PartitionCreationBench.makeFollower               20          false  avgt    2  6.212          ms/op
    PartitionCreationBench.makeFollower               20           true  avgt    2  7.005          ms/op
    

@github-actions github-actions bot added core Kafka Broker tools performance kraft KIP-932 Queues for Kafka build Gradle build or GitHub Actions labels Feb 4, 2025
_currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
flatMap(_.node(listenerName.value()).toScala).toSeq
flatMap(_.node(listenerName.value()).toScala).toList.asJava
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that toList converts to an immutable linked list - you do not want to do that if you are then calling asJava. toBuffer is probably a better option.

}

override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
override def getAliveBrokerNodes(listenerName: ListenerName): util.List[Node] = {
_currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove asScala from here and use stream instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I will update it tomorrow.

@ijuma
Copy link
Member

ijuma commented Feb 4, 2025

I realize your PR is still wip, so feel free to ignore the two comments if you were still working through that code. :)

@AndrewJSchofield AndrewJSchofield removed the KIP-932 Queues for Kafka label Feb 5, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 5, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Feb 5, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 5, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Feb 5, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 9, 2025
@FrankYang0529 FrankYang0529 changed the title KAFKA-17565: Move MetadataCache interface to metadata module (wip) KAFKA-17565: Move MetadataCache interface to metadata module Feb 10, 2025
@chia7712
Copy link
Member

@FrankYang0529 please fix the conflicts, thanks

@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Feb 18, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 21, 2025
Copy link
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529, thanks for this patch! I'm glad to see MetadataCache moving to Java :)

I left several minor comments inline. One overall comment is that I think it would be best to include the parens for no-arg function calls in the Scala code (only for Scala code already included/modified in this PR)

Since MetadataCache is in the hot path for many requests, it would make sense to run our benchmarks before/after this patch to make sure we don't introduce a performance regression.

} else {
broker.metadataCache.numPartitions(topic) == Some(expectedNumPartitions)
broker.metadataCache.numPartitions(topic).isPresent && broker.metadataCache.numPartitions(topic).get == expectedNumPartitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified to use filter or maybe orElse(null) =

flatMap(_.node(listenerName.value()).toScala)
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): util.Optional[Node] = {
util.Optional.ofNullable(_currentImage.cluster.broker(brokerId))
.filter(broker => !broker.fenced)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Java 11 we can now use Predicate.not for negating filters with lambda. This can be

.filter(Predicate.not(_.fenced))

}

override def getTopicId(topicName: String): Uuid = _currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID)
override def getTopicId(topicName: String): Uuid = util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName))
.map(topic => topic.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the method reference style here: .map(_.id)

val image = _currentImage
topics.toSeq.flatMap { topic =>
topics.asScala.toSeq.flatMap { topic =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this conversion to Scala and back to Java collection? Since topics is now a Java set, it seems like we can use a Java stream here.

@@ -2383,7 +2385,7 @@ class KafkaApis(val requestChannel: RequestChannel,
() => {
val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection()
val describeClusterRequest = request.body[DescribeClusterRequest]
metadataCache.getBrokerNodes(request.context.listenerName).foreach { node =>
metadataCache.getBrokerNodes(request.context.listenerName).asScala.foreach { node =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to convert to Scala for the foreach here? Could we use the Java stream and its forEach instead?

@@ -1193,8 +1195,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val coordinatorEndpoint = topicMetadata.head.partitions.asScala
.find(_.partitionIndex == partition)
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.
getAliveBrokerNode(metadata.leaderId, request.context.listenerName))
.flatMap(metadata => OptionConverters.toScala(metadataCache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we let topicMetadata stay as as a Java collection and convert this whole stream to a Java stream?

@@ -74,7 +75,7 @@ class DelayedElectLeader(
private def updateWaiting(): Unit = {
val metadataCache = replicaManager.metadataCache
val completedPartitions = waitingPartitions.collect {
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp
case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exists can be done in Java with filter + isPresent

Comment on lines 191 to 193
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition))
.filter(_.leader != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))
.flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better if we keep the Java Optional and convert it at the end.

@@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
replicaState.updateAndGet { currentReplicaState =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && cachedBrokerEpoch.get > brokerEpoch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify to filter + isPresent

@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Feb 24, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch. a couple of comments are left. PTAL

image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = {
_currentImage.cluster().brokers().values().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_currentImage -> image

*/
Optional<Integer> numPartitions(String topic);

Map<String, Uuid> topicNamesToIds();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is useless - maybe we can remove it to simplify the interface.


Map<Uuid, String> topicIdsToNames();

Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, ListenerName listenerName);

Cluster getClusterMetadata(String clusterId, ListenerName listenerName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val image = _currentImage
val result = new mutable.HashMap[Int, Node]()
val result = new mutable.HashMap[Integer, Node]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please use java Map instead?

.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
.setIsInternal(Topic.isInternal(topic))
.setPartitions(partitionMetadata.toBuffer.asJava))
case None => util.stream.Stream.empty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the redundant space

@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
}

private def getTransactionCoordinator(partition: Int): Option[Node] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use Java optional as it has only one usage

@@ -290,7 +290,7 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept
TestUtils.waitForCondition(
() -> brokers.stream().allMatch(broker -> partitions == 0 ?
broker.metadataCache().numPartitions(topic).isEmpty() :
broker.metadataCache().numPartitions(topic).contains(partitions)
broker.metadataCache().numPartitions(topic).get() == partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broker.metadataCache().numPartitions(topic).filter(p -> p == partitions).isPresent()

maximumNumberOfPartitions: Int,
ignoreTopicsWithExceptions: Boolean
): DescribeTopicPartitionsResponseData = {
val image = _currentImage
var remaining = maximumNumberOfPartitions
val result = new DescribeTopicPartitionsResponseData()
breakable {
topics.foreach { topicName =>
topics.asScala.foreach { topicName =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forEachRemaining

@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 2, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Mar 3, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 3, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Mar 4, 2025
@FrankYang0529 FrankYang0529 requested a review from chia7712 March 4, 2025 01:42
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for your patch. I leave some comments to cleanup the interface


boolean hasAliveBroker(int brokerId);

List<BrokerMetadata> getAliveBrokers();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used by test infra only, and we can rewrite the check by hasAliveBroker.

before

        TestUtils.waitForCondition(() ->
                brokers().values().stream().allMatch(brokerServer -> brokerServer.metadataCache().getAliveBrokers().size() == brokers.size()),
            "Failed to wait for publisher to publish the metadata update to each broker.");

after

        TestUtils.waitForCondition(() ->
                brokers.values().stream().map(BrokerServer::metadataCache)
                    .allMatch(cache -> brokers.values().stream().map(b -> b.config().brokerId()).allMatch(cache::hasAliveBroker)),
            "Failed to wait for publisher to publish the metadata update to each broker.");


Set<String> getAllTopics();

Set<TopicPartition> getTopicPartitions(String topicName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the usage of this method is to get the number of partitions. could you please rewrite the tests to use numPartitions instead?

@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 4, 2025
@FrankYang0529 FrankYang0529 requested a review from chia7712 March 4, 2025 23:50
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Mar 5, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 5, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Mar 6, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 6, 2025
@FrankYang0529 FrankYang0529 removed the KIP-932 Queues for Kafka label Mar 6, 2025
@github-actions github-actions bot added the KIP-932 Queues for Kafka label Mar 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions core Kafka Broker KIP-932 Queues for Kafka kraft performance tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants