Skip to content

Commit 56bffba

Browse files
prakharjain09jbguerraz
authored andcommitted
Minor refactor to PrepareDeltaScan
GitOrigin-RevId: 0f73bc43a9965894c891a9ee3b80fd3860185c7e
1 parent 8839c34 commit 56bffba

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,7 @@ case class DeltaScan(
8484
val scanDurationMs: Long,
8585
val dataSkippingType: DeltaDataSkippingType) {
8686
assert(version == scannedSnapshot.version)
87-
def allFilters: ExpressionSet = partitionFilters ++ dataFilters ++ unusedFilters
87+
88+
lazy val filtersUsedForSkipping: ExpressionSet = partitionFilters ++ dataFilters
89+
lazy val allFilters: ExpressionSet = filtersUsedForSkipping ++ unusedFilters
8890
}

core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala

+30-5
Original file line numberDiff line numberDiff line change
@@ -318,19 +318,44 @@ case class PreparedDeltaFileIndex(
318318
override def matchingFiles(
319319
partitionFilters: Seq[Expression],
320320
dataFilters: Seq[Expression]): Seq[AddFile] = {
321-
val actualFilters = ExpressionSet(partitionFilters ++ dataFilters)
322-
if (preparedScan.allFilters == actualFilters) {
323-
preparedScan.files.distinct
321+
val currentFilters = ExpressionSet(partitionFilters ++ dataFilters)
322+
val (addFiles, eventData) = if (currentFilters == preparedScan.allFilters ||
323+
currentFilters == preparedScan.filtersUsedForSkipping) {
324+
// [[DeltaScan]] was created using `allFilters` out of which only `filtersUsedForSkipping`
325+
// filters were used for skipping while creating the DeltaScan.
326+
// If currentFilters is same as allFilters, then no need to recalculate files and we can use
327+
// previous results.
328+
// If currentFilters is same as filtersUsedForSkipping, then also we don't need to recalculate
329+
// files as [[DeltaScan.files]] were calculates using filtersUsedForSkipping only. So if we
330+
// recalculate, we will get same result. So we should use previous result in this case also.
331+
val eventData = Map(
332+
"reused" -> true,
333+
"currentFiltersSameAsPreparedAllFilters" -> (currentFilters == preparedScan.allFilters),
334+
"currentFiltersSameAsPreparedFiltersUsedForSkipping" ->
335+
(currentFilters == preparedScan.filtersUsedForSkipping)
336+
)
337+
(preparedScan.files.distinct, eventData)
324338
} else {
325339
logInfo(
326340
s"""
327341
|Prepared scan does not match actual filters. Reselecting files to query.
328342
|Prepared: ${preparedScan.allFilters}
329-
|Actual: ${actualFilters}
343+
|Actual: ${currentFilters}
330344
""".stripMargin)
331-
preparedScan.scannedSnapshot.filesForScan(
345+
val eventData = Map(
346+
"reused" -> false,
347+
"preparedAllFilters" -> preparedScan.allFilters.mkString(","),
348+
"preparedFiltersUsedForSkipping" -> preparedScan.filtersUsedForSkipping.mkString(","),
349+
"currentFilters" -> currentFilters.mkString(",")
350+
)
351+
val files = preparedScan.scannedSnapshot.filesForScan(
332352
projection = Nil, partitionFilters ++ dataFilters).files
353+
(files, eventData)
333354
}
355+
recordDeltaEvent(deltaLog,
356+
opType = "delta.preparedDeltaFileIndex.reuseSkippingResult",
357+
data = eventData)
358+
addFiles
334359
}
335360

336361
/**

0 commit comments

Comments
 (0)