Skip to content

Commit 83c1671

Browse files
committed
Only extract the columns used in the query and avoids reading partition values if all values were found in stats
Signed-off-by: Felipe Fujiy Pessoto <[email protected]>
1 parent 219e802 commit 83c1671

File tree

2 files changed

+69
-22
lines changed

2 files changed

+69
-22
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala

+58-22
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.functions._
3030
import org.apache.spark.sql.types._
3131

3232
import java.sql.Date
33+
import java.util.Locale
3334

3435
/** Optimize COUNT, MIN and MAX expressions on Delta tables.
3536
* This optimization is only applied when the following conditions are met:
@@ -57,7 +58,8 @@ trait OptimizeMetadataOnlyDeltaQuery {
5758
val rowCount = extractGlobalCount(tahoeLogFileIndex)
5859

5960
if (rowCount.isDefined) {
60-
lazy val columnStats = extractMinMaxFromDeltaLog(tahoeLogFileIndex)
61+
val aggColumnsNames = Set(extractMinMaxFieldNames(plan).map(_.toLowerCase(Locale.ROOT)) : _*)
62+
val columnStats = extractMinMaxFromDeltaLog(tahoeLogFileIndex, aggColumnsNames)
6163

6264
def checkStatsExists(attrRef: AttributeReference): Boolean = {
6365
columnStats.contains(attrRef.name) &&
@@ -117,6 +119,23 @@ trait OptimizeMetadataOnlyDeltaQuery {
117119
}
118120
}
119121

122+
private def extractMinMaxFieldNames(plan: Aggregate): Seq[String] = {
123+
plan.aggregateExpressions.collect {
124+
case Alias(AggregateExpression(
125+
Min(minReference: AttributeReference), _, _, _, _), _) =>
126+
minReference.name
127+
case Alias(AggregateExpression(
128+
Max(maxReference: AttributeReference), _, _, _, _), _) =>
129+
maxReference.name
130+
case Alias(ToPrettyString(AggregateExpression(
131+
Min(minReference: AttributeReference), _, _, _, _), _), _) =>
132+
minReference.name
133+
case Alias(ToPrettyString(AggregateExpression(
134+
Max(maxReference: AttributeReference), _, _, _, _), _), _) =>
135+
maxReference.name
136+
}
137+
}
138+
120139
/** Return the number of rows in the table or `None` if we cannot calculate it from stats */
121140
private def extractGlobalCount(tahoeLogFileIndex: TahoeLogFileIndex): Option[Long] = {
122141
// account for deleted rows according to deletion vectors
@@ -141,12 +160,15 @@ trait OptimizeMetadataOnlyDeltaQuery {
141160
*/
142161
case class DeltaColumnStat(min: Any, max: Any)
143162

144-
private def extractMinMaxFromStats(deltaScanGenerator: DeltaScanGenerator):
145-
Map[String, DeltaColumnStat] = {
163+
private def extractMinMaxFromStats(
164+
deltaScanGenerator: DeltaScanGenerator,
165+
lowerCaseColumnNames: Set[String]): Map[String, DeltaColumnStat] = {
166+
146167
// TODO Update this to work with DV (https://github.com/delta-io/delta/issues/1485)
147168
val snapshot = deltaScanGenerator.snapshotToScan
148-
val dataColumns = snapshot.statCollectionPhysicalSchema
149-
.filter(col => AggregateDeltaTable.isSupportedDataType(col.dataType))
169+
val dataColumns = snapshot.statCollectionPhysicalSchema.filter(col =>
170+
AggregateDeltaTable.isSupportedDataType(col.dataType) &&
171+
lowerCaseColumnNames.contains(col.name.toLowerCase(Locale.ROOT)))
150172

151173
// Validate all the files has stats
152174
lazy val filesStatsCount = deltaScanGenerator.filesWithStatsForScan(Nil).select(
@@ -232,12 +254,14 @@ trait OptimizeMetadataOnlyDeltaQuery {
232254
}
233255
}
234256

235-
private def extractMinMaxFromPartitionValue(snapshot: Snapshot):
236-
Map[String, DeltaColumnStat] = {
257+
private def extractMinMaxFromPartitionValue(
258+
snapshot: Snapshot,
259+
lowerCaseColumnNames: Set[String]): Map[String, DeltaColumnStat] = {
237260

238261
val partitionedColumns = snapshot.metadata.partitionSchema
239-
.filter(x => AggregateDeltaTable.isSupportedDataType(x.dataType))
240-
.map(x => (x, DeltaColumnMapping.getPhysicalName(x)))
262+
.filter(col => AggregateDeltaTable.isSupportedDataType(col.dataType) &&
263+
lowerCaseColumnNames.contains(col.name.toLowerCase(Locale.ROOT)))
264+
.map(col => (col, DeltaColumnMapping.getPhysicalName(col)))
241265

242266
if (partitionedColumns.isEmpty) {
243267
Map.empty
@@ -271,14 +295,21 @@ trait OptimizeMetadataOnlyDeltaQuery {
271295
}
272296
}
273297

274-
private def extractMinMaxFromDeltaLog(tahoeLogFileIndex: TahoeLogFileIndex):
298+
private def extractMinMaxFromDeltaLog(
299+
tahoeLogFileIndex: TahoeLogFileIndex,
300+
lowerCaseColumnNames: Set[String]):
275301
CaseInsensitiveMap[DeltaColumnStat] = {
276302
val deltaScanGenerator = getDeltaScanGenerator(tahoeLogFileIndex)
277303
val snapshot = deltaScanGenerator.snapshotToScan
304+
val columnFromStats = extractMinMaxFromStats(deltaScanGenerator, lowerCaseColumnNames)
278305

306+
if(lowerCaseColumnNames.equals(columnFromStats.keySet)) {
307+
CaseInsensitiveMap(columnFromStats)
308+
} else {
279309
CaseInsensitiveMap(
280-
extractMinMaxFromStats(deltaScanGenerator).++
281-
(extractMinMaxFromPartitionValue(snapshot)))
310+
columnFromStats.++
311+
(extractMinMaxFromPartitionValue(snapshot, lowerCaseColumnNames)))
312+
}
282313
}
283314

284315
object AggregateDeltaTable {
@@ -291,20 +322,25 @@ trait OptimizeMetadataOnlyDeltaQuery {
291322
dataType.isInstanceOf[DateType]
292323
}
293324

294-
private def isAggExprOptimizable(aggExpr: AggregateExpression): Boolean = aggExpr match {
295-
case AggregateExpression(
296-
Count(Seq(Literal(1, _))), Complete, false, None, _) => true
297-
case AggregateExpression(
298-
Min(min), Complete, false, None, _) => isSupportedDataType(min.dataType)
299-
case AggregateExpression(
300-
Max(max), Complete, false, None, _) => isSupportedDataType(max.dataType)
301-
case _ => false
325+
def getAggFunctionOptimizable(aggExpr: AggregateExpression): Option[DeclarativeAggregate] = {
326+
aggExpr match {
327+
case AggregateExpression(
328+
c@Count(Seq(Literal(1, _))), Complete, false, None, _) =>
329+
Some(c)
330+
case AggregateExpression(
331+
min@Min(minExpr), Complete, false, None, _) if isSupportedDataType(minExpr.dataType) =>
332+
Some(min)
333+
case AggregateExpression(
334+
max@Max(maxExpr), Complete, false, None, _) if isSupportedDataType(maxExpr.dataType) =>
335+
Some(max)
336+
case _ => None
337+
}
302338
}
303339

304340
private def isStatsOptimizable(aggExpr: Seq[Alias]): Boolean = aggExpr.forall {
305-
case Alias(aggExpr: AggregateExpression, _) => isAggExprOptimizable(aggExpr)
341+
case Alias(aggExpr: AggregateExpression, _) => getAggFunctionOptimizable(aggExpr).isDefined
306342
case Alias(ToPrettyString(aggExpr: AggregateExpression, _), _) =>
307-
isAggExprOptimizable(aggExpr)
343+
getAggFunctionOptimizable(aggExpr).isDefined
308344
case _ => false
309345
}
310346

spark/src/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala

+11
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,17 @@ class OptimizeMetadataOnlyDeltaQuerySuite
725725
)
726726
}
727727

728+
test("optimization not supported - min-max column without stats") {
729+
val tableName = "TestColumnWithoutStats"
730+
731+
spark.sql(s"CREATE TABLE $tableName (Column1 INT, Column2 INT) USING DELTA" +
732+
s" TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 1)")
733+
spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 2);")
734+
735+
checkOptimizationIsNotTriggered(
736+
s"SELECT MAX(Column2) FROM $tableName")
737+
}
738+
728739
test("optimization not supported - filter on partitioned column") {
729740
val tableName = "TestPartitionedFilter"
730741

0 commit comments

Comments
 (0)