Skip to content

Commit 16ddfcb

Browse files
committed
Avoid non-deterministic UDF to filter deleted rows
1 parent 13739bf commit 16ddfcb

File tree

4 files changed

+51
-9
lines changed

4 files changed

+51
-9
lines changed

spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala

+3
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
132132
new PrepareDeltaScan(session)
133133
}
134134

135+
// Add skip row column and filter.
136+
extensions.injectPlannerStrategy(PreprocessTableWithDVsStrategy)
137+
135138
// Tries to load PrepareDeltaSharingScan class with class reflection, when delta-sharing-spark
136139
// 3.1+ package is installed, this will be loaded and delta sharing batch queries with
137140
// DeltaSharingFileIndex will be handled by the rule.

spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala

+3-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
2626

2727
import org.apache.spark.broadcast.Broadcast
2828
import org.apache.spark.sql.{Column, SparkSession}
29-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
29+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3030
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
3131
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
3232
import org.apache.spark.sql.catalyst.rules.Rule
@@ -44,7 +44,7 @@ import org.apache.spark.util.SerializableConfiguration
4444
* After rule:
4545
* <Parent Node> ->
4646
* Project(key, value) ->
47-
* Filter (udf(__skip_row == 0) ->
47+
* Filter (__skip_row == 0) ->
4848
* Delta Scan (key, value, __skip_row)
4949
* - Here we insert a new column `__skip_row` in Delta scan. This value is populated by the
5050
* Parquet reader using the DV corresponding to the Parquet file read
@@ -160,11 +160,7 @@ object ScanWithDeletionVectors {
160160
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
161161
val skipRowColumnRef = skipRowColumnRefs.head
162162

163-
val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE)
164-
.asNondeterministic() // To avoid constant folding the filter based on stats.
165-
166-
val filterExp = keepRow(new Column(skipRowColumnRef)).expr
167-
Filter(filterExp, newScan)
163+
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
168164
}
169165

170166
private def createBroadcastDVMap(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.{SparkSession, Strategy}
20+
import org.apache.spark.sql.catalyst.planning.ScanOperation
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.execution.SparkPlan
23+
import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, HadoopFsRelation, LogicalRelation}
24+
25+
/**
26+
* Strategy to process tables with DVs and add the skip row column and filters.
27+
*
28+
* This strategy will apply all transformations needed to tables with DVs and delegate to
29+
* [[FileSourceStrategy]] to create the final plan. The DV filter will be the bottom-most filter in
30+
* the plan and so it will be pushed down to the FileSourceScanExec at the beginning of the filter
31+
* list.
32+
*/
33+
case class PreprocessTableWithDVsStrategy(session: SparkSession)
34+
extends Strategy
35+
with PreprocessTableWithDVs {
36+
37+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
38+
case ScanOperation(_, _, _, _ @ LogicalRelation(_: HadoopFsRelation, _, _, _)) =>
39+
val updatedPlan = preprocessTablesWithDVs(plan)
40+
FileSourceStrategy(updatedPlan)
41+
case _ => Nil
42+
}
43+
}

spark/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
5252
with PredicateHelper
5353
with DeltaLogging
5454
with OptimizeMetadataOnlyDeltaQuery
55-
with PreprocessTableWithDVs { self: PrepareDeltaScan =>
55+
with SubqueryTransformerHelper { self: PrepareDeltaScan =>
5656

5757
/**
5858
* Tracks the first-access snapshots of other logs planned by this rule. The snapshots are
@@ -204,7 +204,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
204204
} else {
205205
prepareDeltaScanWithoutFileSkipping(plan)
206206
}
207-
preprocessTablesWithDVs(updatedPlan)
207+
updatedPlan
208208
}
209209

210210
protected def prepareDeltaScanWithoutFileSkipping(plan: LogicalPlan): LogicalPlan = {

0 commit comments

Comments
 (0)