-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Support predicate pushdown in scans with DVs #2933
[Spark] Support predicate pushdown in scans with DVs #2933
Conversation
b8b7ea8
to
6657d63
Compare
# This is the 1st commit message: flush # This is the commit message delta-io#2: flush # This is the commit message delta-io#3: First sane version without isRowDeleted # This is the commit message delta-io#4: Hack RowIndexMarkingFilters # This is the commit message delta-io#5: Add support for non-vectorized readers # This is the commit message delta-io#6: Metadata column fix
# This is the 1st commit message: flush # This is the commit message delta-io#2: flush # This is the commit message delta-io#3: First sane version without isRowDeleted # This is the commit message delta-io#4: Hack RowIndexMarkingFilters # This is the commit message delta-io#5: Add support for non-vectorized readers # This is the commit message delta-io#6: Metadata column fix # This is the commit message delta-io#7: Avoid non-deterministic UDF to filter deleted rows # This is the commit message delta-io#8: metadata with Expression ID # This is the commit message delta-io#9: Fix complex views issue # This is the commit message delta-io#10: Tests # This is the commit message delta-io#11: cleaning # This is the commit message delta-io#12: More tests and fixes
flush First sane version without isRowDeleted Hack RowIndexMarkingFilters Add support for non-vectorized readers Metadata column fix Avoid non-deterministic UDF to filter deleted rows metadata with Expression ID Fix complex views issue Tests cleaning More tests and fixes Partial cleaning
# This is the 1st commit message: flush # This is the commit message delta-io#2: flush # This is the commit message delta-io#3: First sane version without isRowDeleted # This is the commit message delta-io#4: Hack RowIndexMarkingFilters # This is the commit message delta-io#5: Add support for non-vectorized readers # This is the commit message delta-io#6: Metadata column fix # This is the commit message delta-io#7: Avoid non-deterministic UDF to filter deleted rows # This is the commit message delta-io#8: metadata with Expression ID # This is the commit message delta-io#9: Fix complex views issue # This is the commit message delta-io#10: Tests # This is the commit message delta-io#11: cleaning # This is the commit message delta-io#12: More tests and fixes # This is the commit message delta-io#13: Partial cleaning # This is the commit message delta-io#14: cleaning and improvements # This is the commit message delta-io#15: cleaning and improvements # This is the commit message delta-io#16: Clean RowIndexFilter
flush First sane version without isRowDeleted Hack RowIndexMarkingFilters Add support for non-vectorized readers Metadata column fix Avoid non-deterministic UDF to filter deleted rows metadata with Expression ID Fix complex views issue Tests cleaning More tests and fixes Partial cleaning cleaning and improvements cleaning and improvements Clean RowIndexFilter Clean DeltaParquetFileFormat Improve DeletionVectorsSuite Disable DeltaParquetFileFormatSuite for predicate pushdown.
6657d63
to
8bdfd8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, looks like some of the changes in the DeltaParquetFileFormat
can be reverted/simplified.
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments.
The logic of DeltaParquetFileFormat is getting out of hand but I don't see a fesible way to improve it in short term.
I did some cleaning. It looks better now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
@andreaschat-db do you know why we were using __delta_internal_is_row_deleted in previous version (2.4) if row_index is available in Spark 3.4? Any known issue in Spark 3.4? I wonder if this is the reason: https://issues.apache.org/jira/browse/SPARK-39634 Do you think it is possible to backport to Delta 2.4? Split wouldn't work because Spark 3.4 doesn't support it, but at least the predicate pushdown could work? |
Hi Felipe, There was an issue in PARQUET MR that prevented the correct construction of the row indexes. To backport this, Spark needs to be paired with a parquet version that contains the fix. |
The fixed version, 1.12.3 is in 3.4: https://github.com/apache/spark/blob/da0c7cc81bb3d69d381dd0683e910eae4c80e9ae/pom.xml#L143 I think splits would not be possible yet, because of https://issues.apache.org/jira/browse/PARQUET-2161, which is fixed in 1.13.0 (Spark 3.5) Spark 3.4 ParquetScan.isSplitable, where this is mentioned: https://github.com/apache/spark/blob/da0c7cc81bb3d69d381dd0683e910eae4c80e9ae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala#L50C1-L57C4
But filter pushdown might work. |
Which Delta project/connector is this regarding?
Description
Currently, when Deletion Vectors are enabled we disable predicate pushdown and splitting in scans. This is because we rely on a custom row index column which is constructed in the executors and cannot not handle splits and predicates. These restrictions can now be lifted by relying instead on
metadata.row_index
which was exposed recently after relevant work was concluded.Overall, this PR adds predicate pushdown and splits support as follows:
__delta_internal_is_row_deleted
with_metadata.row_index
.__delta_internal_is_row_deleted
that is based on_metadata.row_index
.IsRowDeleted
filter is now non deterministic to allow predicate pushdown.Furthermore, it includes previous relevant work to remove the UDF from
IsRowDeleted
filter.How was this patch tested?
Added new suites.
Does this PR introduce any user-facing changes?
No.