-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17565: Move MetadataCache interface to metadata module #18801
base: trunk
Are you sure you want to change the base?
Changes from 1 commit
338f362
52c906f
7eb2983
fe9439c
93c1609
defb6b9
3124b0e
8acb30e
570bce5
1fa89e6
568b8c2
2793e49
8d9f259
94b15b3
0f4378a
5d1f048
1b6f636
c35487a
0fdcedc
756b701
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,13 +26,15 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartiti | |
import org.apache.kafka.common.protocol.Errors | ||
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, MetadataResponse} | ||
import org.apache.kafka.common.utils.Time | ||
import org.apache.kafka.metadata.MetadataCache | ||
import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} | ||
|
||
import java.util | ||
import java.util.concurrent.TimeUnit | ||
import scala.collection.{Seq, mutable} | ||
import scala.jdk.CollectionConverters._ | ||
import scala.jdk.javaapi.OptionConverters | ||
|
||
object AddPartitionsToTxnManager { | ||
type AppendCallback = Map[TopicPartition, Errors] => Unit | ||
|
@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager( | |
} | ||
|
||
private def getTransactionCoordinator(partition: Int): Option[Node] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use Java optional as it has only one usage |
||
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition) | ||
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)) | ||
.filter(_.leader != MetadataResponse.NO_LEADER_ID) | ||
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)) | ||
.flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be better if we keep the Java Optional and convert it at the end. |
||
} | ||
|
||
private def topicPartitionsToError(transactionData: AddPartitionsToTxnTransaction, error: Errors): Map[TopicPartition, Errors] = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.ApiError | |
import org.apache.kafka.server.purgatory.DelayedOperation | ||
|
||
import scala.collection.{Map, mutable} | ||
import scala.jdk.javaapi.OptionConverters | ||
|
||
/** A delayed elect leader operation that can be created by the replica manager and watched | ||
* in the elect leader purgatory | ||
|
@@ -74,7 +75,7 @@ class DelayedElectLeader( | |
private def updateWaiting(): Unit = { | ||
val metadataCache = replicaManager.metadataCache | ||
val completedPartitions = waitingPartitions.collect { | ||
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp | ||
case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
} | ||
completedPartitions.foreach { tp => | ||
waitingPartitions -= tp | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplify to
filter
+isPresent