Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix race condition in Uniform conversion #3189

Merged
merged 1 commit into from
Jun 3, 2024

Conversation

harperjiang
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR fixes a race condition in UniForm Iceberg Converter.

Before our change, UniForm Iceberg Converter executes as follows:

  1. Read lastConvertedDeltaVersion from Iceberg latest snapshot
  2. Convert the delta commits starting from lastConvertedDeltaVersion to iceberg snapshots
  3. Commit the iceberg snapshots.

When there are multiple iceberg conversion threads, a race condition may occur, causing one delta commit to be written into multiple Iceberg snapshots, and data corruption.

As an example, considering we have a UniForm table with latest delta version and iceberg version both 1. Two threads A and B start writing to delta tables.

  1. Thread A writes Delta version 2, reads lastConvertedDeltaVersion = 1, and converts delta version 2.
  2. Thread B writes Delta version 3, reads lastConvertedDeltaVersion = 1, and converts delta version 2, 3.
  3. Thread A commits Iceberg version 2, including converted delta version 2.
  4. Thread B commits Iceberg version 3, including converted delta version 2 and 3.

When both threads commit to Iceberg, we will have delta version 2 included in iceberg history twice as different snapshots. If version 2 is an AddFile, that means we insert the same data twice into iceberg.

Our fix works as follows:

  1. Read lastConvertedDeltaVersion and a new field lastConvertedIcebergSnapshotId from Iceberg latest snapshot
  2. Convert the delta commits starting from lastConvertedDeltaVersion to iceberg snapshots
  3. Before Iceberg Commits, checks that the base snapshot ID of this transaction equals lastConvertedIcebergSnapshotId (this check is the core of this change)
  4. Commit the iceberg snapshots.

This change makes sure we are only committing against a specific Iceberg snapshot, and will abort if the snapshot we want to commit against is not the latest one. As an example, our fix will successfully block the example above.

  1. Thread A writes Delta version 2, reads lastConvertedDeltaVersion = 1, lastConvertedIcebergSnapshotId = S0 and converts delta version 2.
  2. Thread B writes Delta version 3, reads lastConvertedDeltaVersion = 1, lastConvertedIcebergSnapshotId = S0 and converts delta version 2, 3.
  3. Thread A creates an Iceberg transaction with parent snapshot S0. Because lastConvertedIcebergSnapshotId is also S0, it commits and update iceberg latest snapshot to S1.
  4. Thread B creates an Iceberg transaction, with parent snapshot S1. Because lastConvertedIcebergSnapshotId is S0 != S1, it aborts the conversion.

How was this patch tested?

Does this PR introduce any user-facing changes?

Copy link
Contributor

@lzlfred lzlfred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@vkorukanti vkorukanti merged commit be9718d into delta-io:master Jun 3, 2024
9 of 10 checks passed
richardc-db pushed a commit to richardc-db/delta that referenced this pull request Jun 5, 2024
## Description
This PR fixes a race condition in UniForm Iceberg Converter. 

Before our change, UniForm Iceberg Converter executes as follows:
1. Read `lastConvertedDeltaVersion` from Iceberg latest snapshot
2. Convert the delta commits starting from `lastConvertedDeltaVersion`
to iceberg snapshots
3. Commit the iceberg snapshots.

When there are multiple iceberg conversion threads, a race condition may
occur, causing one delta commit to be written into multiple Iceberg
snapshots, and data corruption.

As an example, considering we have a UniForm table with latest delta
version and iceberg version both 1. Two threads A and B start writing to
delta tables.

1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` =
1, and converts delta version 2.
2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` =
1, and converts delta version 2, 3.
3. Thread A commits Iceberg version 2, including converted delta version
2.
4. Thread B commits Iceberg version 3, including converted delta version
2 and 3.

When both threads commit to Iceberg, we will have delta version 2
included in iceberg history twice as different snapshots. If version 2
is an AddFile, that means we insert the same data twice into iceberg.

Our fix works as follows: 
1. Read `lastConvertedDeltaVersion` and **a new field**
`lastConvertedIcebergSnapshotId` from Iceberg latest snapshot
2. Convert the delta commits starting from `lastConvertedDeltaVersion`
to iceberg snapshots
5. Before Iceberg Commits, checks that the base snapshot ID of this
transaction equals `lastConvertedIcebergSnapshotId` (**this check is the
core of this change**)
6. Commit the iceberg snapshots.

This change makes sure we are only committing against a specific Iceberg
snapshot, and will abort if the snapshot we want to commit against is
not the latest one. As an example, our fix will successfully block the
example above.

1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` =
1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2.
2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` =
1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2,
3.
3. Thread A creates an Iceberg transaction with parent snapshot S0.
Because `lastConvertedIcebergSnapshotId` is also S0, it commits and
update iceberg latest snapshot to S1.
4. Thread B creates an Iceberg transaction, with parent snapshot S1.
Because `lastConvertedIcebergSnapshotId` is S0 != S1, it aborts the
conversion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants