Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Avoid non-deterministic UDF to filter deleted rows #2576

Closed
wants to merge 1 commit into from

Conversation

cstavr
Copy link
Contributor

@cstavr cstavr commented Jan 26, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Currently, filtering of rows that are marked as deleted (DVs) is performed with a non-deterministic UDF that is added in the plan during PrepareDeltaScan rule. The problem is that the non-deterministic UDF prevents any filters to be pushed down to the scan, resulting in bad performance. In addition, the non-deterministic UDF prevents a number of optimizations, e.g. reusing subqueries.

To avoid the above issues, this commit replaces the non-deterministic UDF with a standard filter expressions that is injected by the new PreprocessTableWithDVsStrategy before converting the logical plan to a physical one. The DV filter will be the bottom-most filter in the logical plan and so will be placed at the beginning of the filters that are pushed to the FileSourceScanExec node.

Note that the DV filter will not be further pushed down to the Parquet reader because filter pushdown is disabled when DVs are enabled.

How was this patch tested?

Existing tests.

Does this PR introduce any user-facing changes?

No

vkorukanti pushed a commit that referenced this pull request Apr 26, 2024
## Description

Currently, when Deletion Vectors are enabled we disable predicate
pushdown and splitting in scans. This is because we rely on a custom row
index column which is constructed in the executors and cannot not handle
splits and predicates. These restrictions can now be lifted by relying
instead on `metadata.row_index` which was exposed recently after
relevant [work](https://issues.apache.org/jira/browse/SPARK-37980) was
concluded.

Overall, this PR adds predicate pushdown and splits support as follows:

1. Replaces `__delta_internal_is_row_deleted` with
`_metadata.row_index`.
2. Adds a new implementation of `__delta_internal_is_row_deleted` that
is based on `_metadata.row_index`.
3. `IsRowDeleted` filter is now non deterministic to allow predicate
pushdown.

Furthermore, it includes previous relevant
[work](#2576) to remove the UDF
from `IsRowDeleted` filter.

## How was this patch tested?
Added new suites.
scottsand-db pushed a commit that referenced this pull request Apr 26, 2024
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Currently, when Deletion Vectors are enabled we disable predicate
pushdown and splitting in scans. This is because we rely on a custom row
index column which is constructed in the executors and cannot not handle
splits and predicates. These restrictions can now be lifted by relying
instead on `metadata.row_index` which was exposed recently after
relevant [work](https://issues.apache.org/jira/browse/SPARK-37980) was
concluded.

Overall, this PR adds predicate pushdown and splits support as follows:

1. Replaces `__delta_internal_is_row_deleted` with
`_metadata.row_index`.
2. Adds a new implementation of `__delta_internal_is_row_deleted` that
is based on `_metadata.row_index`.
3. `IsRowDeleted` filter is now non deterministic to allow predicate
pushdown.

Furthermore, it includes previous relevant
[work](#2576) to remove the UDF
from `IsRowDeleted` filter.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Added new suites.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
@cstavr cstavr closed this Jun 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant