Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support predicate pushdown in scans with DVs #2933

Merged

Conversation

andreaschat-db
Copy link
Contributor

@andreaschat-db andreaschat-db commented Apr 21, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Currently, when Deletion Vectors are enabled we disable predicate pushdown and splitting in scans. This is because we rely on a custom row index column which is constructed in the executors and cannot not handle splits and predicates. These restrictions can now be lifted by relying instead on metadata.row_index which was exposed recently after relevant work was concluded.

Overall, this PR adds predicate pushdown and splits support as follows:

  1. Replaces __delta_internal_is_row_deleted with _metadata.row_index.
  2. Adds a new implementation of __delta_internal_is_row_deleted that is based on _metadata.row_index.
  3. IsRowDeleted filter is now non deterministic to allow predicate pushdown.

Furthermore, it includes previous relevant work to remove the UDF from IsRowDeleted filter.

How was this patch tested?

Added new suites.

Does this PR introduce any user-facing changes?

No.

@andreaschat-db andreaschat-db changed the title [TEST] Support predicate pushday in scans with DVs [TEST][DO_NOT_MERGE] Support predicate pushday in scans with DVs Apr 21, 2024
@andreaschat-db andreaschat-db changed the title [TEST][DO_NOT_MERGE] Support predicate pushday in scans with DVs [TEST][DO_NOT_MERGE] Support predicate pushdown in scans with DVs Apr 21, 2024
@andreaschat-db andreaschat-db force-pushed the supportPredicatePushdayInScansWithDVs branch from b8b7ea8 to 6657d63 Compare April 23, 2024 15:12
flush
flush

First sane version without isRowDeleted
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix
flush

First sane version without isRowDeleted

Hack RowIndexMarkingFilters

Add support for non-vectorized readers

Metadata column fix

Avoid non-deterministic UDF to filter deleted rows
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix

# This is the commit message delta-io#7:

Avoid non-deterministic UDF to filter deleted rows

# This is the commit message delta-io#8:

metadata with Expression ID

# This is the commit message delta-io#9:

Fix complex views issue

# This is the commit message delta-io#10:

Tests

# This is the commit message delta-io#11:

cleaning

# This is the commit message delta-io#12:

More tests and fixes
flush

First sane version without isRowDeleted

Hack RowIndexMarkingFilters

Add support for non-vectorized readers

Metadata column fix

Avoid non-deterministic UDF to filter deleted rows

metadata with Expression ID

Fix complex views issue

Tests

cleaning

More tests and fixes

Partial cleaning
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix

# This is the commit message delta-io#7:

Avoid non-deterministic UDF to filter deleted rows

# This is the commit message delta-io#8:

metadata with Expression ID

# This is the commit message delta-io#9:

Fix complex views issue

# This is the commit message delta-io#10:

Tests

# This is the commit message delta-io#11:

cleaning

# This is the commit message delta-io#12:

More tests and fixes

# This is the commit message delta-io#13:

Partial cleaning

# This is the commit message delta-io#14:

cleaning and improvements

# This is the commit message delta-io#15:

cleaning and improvements

# This is the commit message delta-io#16:

Clean RowIndexFilter
flush

First sane version without isRowDeleted

Hack RowIndexMarkingFilters

Add support for non-vectorized readers

Metadata column fix

Avoid non-deterministic UDF to filter deleted rows

metadata with Expression ID

Fix complex views issue

Tests

cleaning

More tests and fixes

Partial cleaning

cleaning and improvements

cleaning and improvements

Clean RowIndexFilter

Clean DeltaParquetFileFormat

Improve DeletionVectorsSuite

Disable DeltaParquetFileFormatSuite for predicate pushdown.
@andreaschat-db andreaschat-db force-pushed the supportPredicatePushdayInScansWithDVs branch from 6657d63 to 8bdfd8c Compare April 23, 2024 18:14
@andreaschat-db andreaschat-db changed the title [TEST][DO_NOT_MERGE] Support predicate pushdown in scans with DVs [Spark] Support predicate pushdown in scans with DVs Apr 24, 2024
Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, looks like some of the changes in the DeltaParquetFileFormat can be reverted/simplified.

Copy link
Collaborator

@xupefei xupefei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments.
The logic of DeltaParquetFileFormat is getting out of hand but I don't see a fesible way to improve it in short term.

@andreaschat-db
Copy link
Contributor Author

Minor comments. The logic of DeltaParquetFileFormat is getting out of hand but I don't see a fesible way to improve it in short term.

I did some cleaning. It looks better now.

Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@vkorukanti vkorukanti merged commit f4a4944 into delta-io:master Apr 26, 2024
7 of 8 checks passed
@felipepessoto
Copy link
Contributor

felipepessoto commented May 7, 2024

@andreaschat-db do you know why we were using __delta_internal_is_row_deleted in previous version (2.4) if row_index is available in Spark 3.4? Any known issue in Spark 3.4?

I wonder if this is the reason: https://issues.apache.org/jira/browse/SPARK-39634

Do you think it is possible to backport to Delta 2.4? Split wouldn't work because Spark 3.4 doesn't support it, but at least the predicate pushdown could work?

@andreaschat-db
Copy link
Contributor Author

andreaschat-db commented May 8, 2024

@andreaschat-db do you know why we were using __delta_internal_is_row_deleted in previous version (2.4) if row_index is available in Spark 3.4? Any known issue in Spark 3.4?

I wonder if this is the reason: https://issues.apache.org/jira/browse/SPARK-39634

Do you think it is possible to backport to Delta 2.4? Split wouldn't work because Spark 3.4 doesn't support it, but at least the predicate pushdown could work?

Hi Felipe,

There was an issue in PARQUET MR that prevented the correct construction of the row indexes. To backport this, Spark needs to be paired with a parquet version that contains the fix.

@felipepessoto
Copy link
Contributor

felipepessoto commented May 8, 2024

The fixed version, 1.12.3 is in 3.4: https://github.com/apache/spark/blob/da0c7cc81bb3d69d381dd0683e910eae4c80e9ae/pom.xml#L143

I think splits would not be possible yet, because of https://issues.apache.org/jira/browse/PARQUET-2161, which is fixed in 1.13.0 (Spark 3.5)

Spark 3.4 ParquetScan.isSplitable, where this is mentioned: https://github.com/apache/spark/blob/da0c7cc81bb3d69d381dd0683e910eae4c80e9ae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala#L50C1-L57C4

  override def isSplitable(path: Path): Boolean = {
    // If aggregate is pushed down, only the file footer will be read once,
    // so file should not be split across multiple tasks.
    pushedAggregate.isEmpty &&
      // SPARK-39634: Allow file splitting in combination with row index generation once
      // the fix for PARQUET-2161 is available.
      !RowIndexUtil.isNeededForSchema(readSchema)
  }

But filter pushdown might work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants