Skip to content

Commit e8e2d83

Browse files
committed
Avoid non-deterministic UDF to filter deleted rows
1 parent 13739bf commit e8e2d83

File tree

3 files changed

+8
-9
lines changed

3 files changed

+8
-9
lines changed

spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala

+3
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
132132
new PrepareDeltaScan(session)
133133
}
134134

135+
// Add skip row column and filter.
136+
extensions.injectPlannerStrategy(PreprocessTableWithDVsStrategy)
137+
135138
// Tries to load PrepareDeltaSharingScan class with class reflection, when delta-sharing-spark
136139
// 3.1+ package is installed, this will be loaded and delta sharing batch queries with
137140
// DeltaSharingFileIndex will be handled by the rule.

spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala

+3-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
2626

2727
import org.apache.spark.broadcast.Broadcast
2828
import org.apache.spark.sql.{Column, SparkSession}
29-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
29+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3030
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
3131
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
3232
import org.apache.spark.sql.catalyst.rules.Rule
@@ -44,7 +44,7 @@ import org.apache.spark.util.SerializableConfiguration
4444
* After rule:
4545
* <Parent Node> ->
4646
* Project(key, value) ->
47-
* Filter (udf(__skip_row == 0) ->
47+
* Filter (__skip_row == 0) ->
4848
* Delta Scan (key, value, __skip_row)
4949
* - Here we insert a new column `__skip_row` in Delta scan. This value is populated by the
5050
* Parquet reader using the DV corresponding to the Parquet file read
@@ -160,11 +160,7 @@ object ScanWithDeletionVectors {
160160
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
161161
val skipRowColumnRef = skipRowColumnRefs.head
162162

163-
val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE)
164-
.asNondeterministic() // To avoid constant folding the filter based on stats.
165-
166-
val filterExp = keepRow(new Column(skipRowColumnRef)).expr
167-
Filter(filterExp, newScan)
163+
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
168164
}
169165

170166
private def createBroadcastDVMap(

spark/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
5252
with PredicateHelper
5353
with DeltaLogging
5454
with OptimizeMetadataOnlyDeltaQuery
55-
with PreprocessTableWithDVs { self: PrepareDeltaScan =>
55+
with SubqueryTransformerHelper { self: PrepareDeltaScan =>
5656

5757
/**
5858
* Tracks the first-access snapshots of other logs planned by this rule. The snapshots are
@@ -204,7 +204,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
204204
} else {
205205
prepareDeltaScanWithoutFileSkipping(plan)
206206
}
207-
preprocessTablesWithDVs(updatedPlan)
207+
updatedPlan
208208
}
209209

210210
protected def prepareDeltaScanWithoutFileSkipping(plan: LogicalPlan): LogicalPlan = {

0 commit comments

Comments
 (0)