[Spark] Fix race condition in Uniform conversion #3189
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Which Delta project/connector is this regarding?
Description
This PR fixes a race condition in UniForm Iceberg Converter.
Before our change, UniForm Iceberg Converter executes as follows:
lastConvertedDeltaVersion
from Iceberg latest snapshotlastConvertedDeltaVersion
to iceberg snapshotsWhen there are multiple iceberg conversion threads, a race condition may occur, causing one delta commit to be written into multiple Iceberg snapshots, and data corruption.
As an example, considering we have a UniForm table with latest delta version and iceberg version both 1. Two threads A and B start writing to delta tables.
lastConvertedDeltaVersion
= 1, and converts delta version 2.lastConvertedDeltaVersion
= 1, and converts delta version 2, 3.When both threads commit to Iceberg, we will have delta version 2 included in iceberg history twice as different snapshots. If version 2 is an AddFile, that means we insert the same data twice into iceberg.
Our fix works as follows:
lastConvertedDeltaVersion
and a new fieldlastConvertedIcebergSnapshotId
from Iceberg latest snapshotlastConvertedDeltaVersion
to iceberg snapshotslastConvertedIcebergSnapshotId
(this check is the core of this change)This change makes sure we are only committing against a specific Iceberg snapshot, and will abort if the snapshot we want to commit against is not the latest one. As an example, our fix will successfully block the example above.
lastConvertedDeltaVersion
= 1,lastConvertedIcebergSnapshotId
= S0 and converts delta version 2.lastConvertedDeltaVersion
= 1,lastConvertedIcebergSnapshotId
= S0 and converts delta version 2, 3.lastConvertedIcebergSnapshotId
is also S0, it commits and update iceberg latest snapshot to S1.lastConvertedIcebergSnapshotId
is S0 != S1, it aborts the conversion.How was this patch tested?
Does this PR introduce any user-facing changes?