From be9718dfc1eb0b9cedd428182eb8f3d604fd7663 Mon Sep 17 00:00:00 2001 From: Hao Jiang Date: Mon, 3 Jun 2024 10:02:14 -0700 Subject: [PATCH] [Spark] Fix race condition in Uniform conversion (#3189) ## Description This PR fixes a race condition in UniForm Iceberg Converter. Before our change, UniForm Iceberg Converter executes as follows: 1. Read `lastConvertedDeltaVersion` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 3. Commit the iceberg snapshots. When there are multiple iceberg conversion threads, a race condition may occur, causing one delta commit to be written into multiple Iceberg snapshots, and data corruption. As an example, considering we have a UniForm table with latest delta version and iceberg version both 1. Two threads A and B start writing to delta tables. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2, 3. 3. Thread A commits Iceberg version 2, including converted delta version 2. 4. Thread B commits Iceberg version 3, including converted delta version 2 and 3. When both threads commit to Iceberg, we will have delta version 2 included in iceberg history twice as different snapshots. If version 2 is an AddFile, that means we insert the same data twice into iceberg. Our fix works as follows: 1. Read `lastConvertedDeltaVersion` and **a new field** `lastConvertedIcebergSnapshotId` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 5. Before Iceberg Commits, checks that the base snapshot ID of this transaction equals `lastConvertedIcebergSnapshotId` (**this check is the core of this change**) 6. Commit the iceberg snapshots. This change makes sure we are only committing against a specific Iceberg snapshot, and will abort if the snapshot we want to commit against is not the latest one. As an example, our fix will successfully block the example above. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2, 3. 3. Thread A creates an Iceberg transaction with parent snapshot S0. Because `lastConvertedIcebergSnapshotId` is also S0, it commits and update iceberg latest snapshot to S1. 4. Thread B creates an Iceberg transaction, with parent snapshot S1. Because `lastConvertedIcebergSnapshotId` is S0 != S1, it aborts the conversion. --- .../IcebergConversionTransaction.scala | 37 ++++++++++++++++++- .../icebergShaded/IcebergConverter.scala | 33 ++++++++++++++--- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index 7745f9e55f6..df4cfb20465 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.icebergShaded +import java.util.ConcurrentModificationException + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -29,7 +31,6 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} -import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables import shadedForDelta.org.apache.iceberg.mapping.MappingUtil import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser @@ -48,12 +49,15 @@ case object REPLACE_TABLE extends IcebergTableOp * @param conf Configuration for Iceberg Hadoop interactions. * @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. * @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. + * @param lastConvertedIcebergSnapshotId the iceberg snapshot this Iceberg txn should write to. + * @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from. */ class IcebergConversionTransaction( protected val catalogTable: CatalogTable, protected val conf: Configuration, protected val postCommitSnapshot: Snapshot, protected val tableOp: IcebergTableOp = WRITE_TABLE, + protected val lastConvertedIcebergSnapshotId: Option[Long] = None, protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging { /////////////////////////// @@ -197,7 +201,7 @@ class IcebergConversionTransaction( DeltaFileProviderUtils.createJsonStatsParser(postCommitSnapshot.statsSchema) /** Visible for testing. */ - private[icebergShaded]val txn = createIcebergTxn() + private[icebergShaded]val (txn, startFromSnapshotId) = withStartSnapshotId(createIcebergTxn()) /** Tracks if this transaction has already committed. You can only commit once. */ private var committed = false @@ -320,6 +324,25 @@ class IcebergConversionTransaction( .set(IcebergConverter.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping) .commit() + // We ensure the iceberg txns are serializable by only allowing them to commit against + // lastConvertedIcebergSnapshotId. + // + // If the startFromSnapshotId is non-empty and not the same as lastConvertedIcebergSnapshotId, + // there is a new iceberg transaction committed after we read lastConvertedIcebergSnapshotId, + // and before this check. We explicitly abort by throwing exceptions. + // + // If startFromSnapshotId is empty, the txn must be one of the following: + // 1. CREATE_TABLE + // 2. Writing to an empty table + // 3. REPLACE_TABLE + // In either case this txn is safe to commit. + // + // Iceberg will further guarantee that txns passed this check are serializable. + if (startFromSnapshotId.isDefined && lastConvertedIcebergSnapshotId != startFromSnapshotId) { + throw new ConcurrentModificationException("Cannot commit because the converted " + + s"metadata is based on a stale iceberg snapshot $lastConvertedIcebergSnapshotId" + ) + } try { txn.commitTransaction() if (tableOp == CREATE_TABLE) { @@ -399,6 +422,16 @@ class IcebergConversionTransaction( // Helper Methods // //////////////////// + /** + * We fetch the txn table's current snapshot id before any writing is made on the transaction. + * This id should equal [[lastConvertedIcebergSnapshotId]] for the transaction to commit. + * + * @param txn the iceberg transaction + * @return txn and the snapshot id just before this txn + */ + private def withStartSnapshotId(txn: IcebergTransaction): (IcebergTransaction, Option[Long]) = + (txn, Option(txn.table().currentSnapshot()).map(_.snapshotId())) + private def recordIcebergCommit(errorOpt: Option[Throwable] = None): Unit = { val icebergTxnTypes = if (fileUpdates.nonEmpty) Map("icebergTxnTypes" -> fileUpdates.map(_.opType)) else Map.empty diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index 04fa23cf88f..3b5ef1464ee 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path +import shadedForDelta.org.apache.iceberg.{Table => IcebergTable} import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations} import org.apache.spark.sql.SparkSession @@ -51,6 +52,9 @@ object IcebergConverter { val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" val ICEBERG_NAME_MAPPING_PROPERTY = "schema.name-mapping.default" + + def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] = + table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong) } /** @@ -235,13 +239,16 @@ class IcebergConverter(spark: SparkSession) catalogTable: CatalogTable): Option[(Long, Long)] = recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") { val log = snapshotToConvert.deltaLog - val lastDeltaVersionConverted: Option[Long] = - loadLastDeltaVersionConverted(snapshotToConvert, catalogTable) + val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable) + val lastConvertedIcebergSnapshotId = + lastConvertedIcebergTable.flatMap(it => Option(it.currentSnapshot())).map(_.snapshotId()) + val lastDeltaVersionConverted = IcebergConverter + .getLastConvertedDeltaVersion(lastConvertedIcebergTable) val maxCommitsToConvert = spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT) // Nth to convert - if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) { + if (lastDeltaVersionConverted.contains(snapshotToConvert.version)) { return None } @@ -276,7 +283,8 @@ class IcebergConverter(spark: SparkSession) } val icebergTxn = new IcebergConversionTransaction( - catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted) + catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, + lastConvertedIcebergSnapshotId, lastDeltaVersionConverted) // Write out the actions taken since the last conversion (or since table creation). // This is done in batches, with each batch corresponding either to one delta file, @@ -349,9 +357,24 @@ class IcebergConverter(spark: SparkSession) override def loadLastDeltaVersionConverted( snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] = recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") { - catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong) + IcebergConverter.getLastConvertedDeltaVersion(loadIcebergTable(snapshot, catalogTable)) } + protected def loadIcebergTable( + snapshot: Snapshot, catalogTable: CatalogTable): Option[IcebergTable] = { + recordFrameProfile("Delta", "IcebergConverter.loadLastConvertedIcebergTable") { + val hiveCatalog = IcebergTransactionUtils + .createHiveCatalog(snapshot.deltaLog.newDeltaHadoopConf()) + val icebergTableId = IcebergTransactionUtils + .convertSparkTableIdentifierToIcebergHive(catalogTable.identifier) + if (hiveCatalog.tableExists(icebergTableId)) { + Some(hiveCatalog.loadTable(icebergTableId)) + } else { + None + } + } + } + /** * Build an iceberg TransactionHelper from the provided txn, and commit the set of changes * specified by the actionsToCommit.