Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Spark] Fix race condition in Uniform conversion (#3189)
## Description This PR fixes a race condition in UniForm Iceberg Converter. Before our change, UniForm Iceberg Converter executes as follows: 1. Read `lastConvertedDeltaVersion` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 3. Commit the iceberg snapshots. When there are multiple iceberg conversion threads, a race condition may occur, causing one delta commit to be written into multiple Iceberg snapshots, and data corruption. As an example, considering we have a UniForm table with latest delta version and iceberg version both 1. Two threads A and B start writing to delta tables. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2, 3. 3. Thread A commits Iceberg version 2, including converted delta version 2. 4. Thread B commits Iceberg version 3, including converted delta version 2 and 3. When both threads commit to Iceberg, we will have delta version 2 included in iceberg history twice as different snapshots. If version 2 is an AddFile, that means we insert the same data twice into iceberg. Our fix works as follows: 1. Read `lastConvertedDeltaVersion` and **a new field** `lastConvertedIcebergSnapshotId` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 5. Before Iceberg Commits, checks that the base snapshot ID of this transaction equals `lastConvertedIcebergSnapshotId` (**this check is the core of this change**) 6. Commit the iceberg snapshots. This change makes sure we are only committing against a specific Iceberg snapshot, and will abort if the snapshot we want to commit against is not the latest one. As an example, our fix will successfully block the example above. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2, 3. 3. Thread A creates an Iceberg transaction with parent snapshot S0. Because `lastConvertedIcebergSnapshotId` is also S0, it commits and update iceberg latest snapshot to S1. 4. Thread B creates an Iceberg transaction, with parent snapshot S1. Because `lastConvertedIcebergSnapshotId` is S0 != S1, it aborts the conversion.
- Loading branch information