Skip to content

Commit

Permalink
[Spark] Fix race condition in Uniform conversion (#3189)
Browse files Browse the repository at this point in the history
## Description
This PR fixes a race condition in UniForm Iceberg Converter. 

Before our change, UniForm Iceberg Converter executes as follows:
1. Read `lastConvertedDeltaVersion` from Iceberg latest snapshot
2. Convert the delta commits starting from `lastConvertedDeltaVersion`
to iceberg snapshots
3. Commit the iceberg snapshots.

When there are multiple iceberg conversion threads, a race condition may
occur, causing one delta commit to be written into multiple Iceberg
snapshots, and data corruption.

As an example, considering we have a UniForm table with latest delta
version and iceberg version both 1. Two threads A and B start writing to
delta tables.

1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` =
1, and converts delta version 2.
2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` =
1, and converts delta version 2, 3.
3. Thread A commits Iceberg version 2, including converted delta version
2.
4. Thread B commits Iceberg version 3, including converted delta version
2 and 3.

When both threads commit to Iceberg, we will have delta version 2
included in iceberg history twice as different snapshots. If version 2
is an AddFile, that means we insert the same data twice into iceberg.

Our fix works as follows: 
1. Read `lastConvertedDeltaVersion` and **a new field**
`lastConvertedIcebergSnapshotId` from Iceberg latest snapshot
2. Convert the delta commits starting from `lastConvertedDeltaVersion`
to iceberg snapshots
5. Before Iceberg Commits, checks that the base snapshot ID of this
transaction equals `lastConvertedIcebergSnapshotId` (**this check is the
core of this change**)
6. Commit the iceberg snapshots.

This change makes sure we are only committing against a specific Iceberg
snapshot, and will abort if the snapshot we want to commit against is
not the latest one. As an example, our fix will successfully block the
example above.

1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` =
1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2.
2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` =
1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2,
3.
3. Thread A creates an Iceberg transaction with parent snapshot S0.
Because `lastConvertedIcebergSnapshotId` is also S0, it commits and
update iceberg latest snapshot to S1.
4. Thread B creates an Iceberg transaction, with parent snapshot S1.
Because `lastConvertedIcebergSnapshotId` is S0 != S1, it aborts the
conversion.
  • Loading branch information
harperjiang authored Jun 3, 2024
1 parent c0b3c97 commit be9718d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.icebergShaded

import java.util.ConcurrentModificationException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
Expand All @@ -29,7 +31,6 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser

Expand All @@ -48,12 +49,15 @@ case object REPLACE_TABLE extends IcebergTableOp
* @param conf Configuration for Iceberg Hadoop interactions.
* @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit.
* @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE.
* @param lastConvertedIcebergSnapshotId the iceberg snapshot this Iceberg txn should write to.
* @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from.
*/
class IcebergConversionTransaction(
protected val catalogTable: CatalogTable,
protected val conf: Configuration,
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging {

///////////////////////////
Expand Down Expand Up @@ -197,7 +201,7 @@ class IcebergConversionTransaction(
DeltaFileProviderUtils.createJsonStatsParser(postCommitSnapshot.statsSchema)

/** Visible for testing. */
private[icebergShaded]val txn = createIcebergTxn()
private[icebergShaded]val (txn, startFromSnapshotId) = withStartSnapshotId(createIcebergTxn())

/** Tracks if this transaction has already committed. You can only commit once. */
private var committed = false
Expand Down Expand Up @@ -320,6 +324,25 @@ class IcebergConversionTransaction(
.set(IcebergConverter.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping)
.commit()

// We ensure the iceberg txns are serializable by only allowing them to commit against
// lastConvertedIcebergSnapshotId.
//
// If the startFromSnapshotId is non-empty and not the same as lastConvertedIcebergSnapshotId,
// there is a new iceberg transaction committed after we read lastConvertedIcebergSnapshotId,
// and before this check. We explicitly abort by throwing exceptions.
//
// If startFromSnapshotId is empty, the txn must be one of the following:
// 1. CREATE_TABLE
// 2. Writing to an empty table
// 3. REPLACE_TABLE
// In either case this txn is safe to commit.
//
// Iceberg will further guarantee that txns passed this check are serializable.
if (startFromSnapshotId.isDefined && lastConvertedIcebergSnapshotId != startFromSnapshotId) {
throw new ConcurrentModificationException("Cannot commit because the converted " +
s"metadata is based on a stale iceberg snapshot $lastConvertedIcebergSnapshotId"
)
}
try {
txn.commitTransaction()
if (tableOp == CREATE_TABLE) {
Expand Down Expand Up @@ -399,6 +422,16 @@ class IcebergConversionTransaction(
// Helper Methods //
////////////////////

/**
* We fetch the txn table's current snapshot id before any writing is made on the transaction.
* This id should equal [[lastConvertedIcebergSnapshotId]] for the transaction to commit.
*
* @param txn the iceberg transaction
* @return txn and the snapshot id just before this txn
*/
private def withStartSnapshotId(txn: IcebergTransaction): (IcebergTransaction, Option[Long]) =
(txn, Option(txn.table().currentSnapshot()).map(_.snapshotId()))

private def recordIcebergCommit(errorOpt: Option[Throwable] = None): Unit = {
val icebergTxnTypes =
if (fileUpdates.nonEmpty) Map("icebergTxnTypes" -> fileUpdates.map(_.opType)) else Map.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{Table => IcebergTable}
import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations}

import org.apache.spark.sql.SparkSession
Expand All @@ -51,6 +52,9 @@ object IcebergConverter {
val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp"

val ICEBERG_NAME_MAPPING_PROPERTY = "schema.name-mapping.default"

def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] =
table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong)
}

/**
Expand Down Expand Up @@ -235,13 +239,16 @@ class IcebergConverter(spark: SparkSession)
catalogTable: CatalogTable): Option[(Long, Long)] =
recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") {
val log = snapshotToConvert.deltaLog
val lastDeltaVersionConverted: Option[Long] =
loadLastDeltaVersionConverted(snapshotToConvert, catalogTable)
val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable)
val lastConvertedIcebergSnapshotId =
lastConvertedIcebergTable.flatMap(it => Option(it.currentSnapshot())).map(_.snapshotId())
val lastDeltaVersionConverted = IcebergConverter
.getLastConvertedDeltaVersion(lastConvertedIcebergTable)
val maxCommitsToConvert =
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT)

// Nth to convert
if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) {
if (lastDeltaVersionConverted.contains(snapshotToConvert.version)) {
return None
}

Expand Down Expand Up @@ -276,7 +283,8 @@ class IcebergConverter(spark: SparkSession)
}

val icebergTxn = new IcebergConversionTransaction(
catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted)
catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp,
lastConvertedIcebergSnapshotId, lastDeltaVersionConverted)

// Write out the actions taken since the last conversion (or since table creation).
// This is done in batches, with each batch corresponding either to one delta file,
Expand Down Expand Up @@ -349,9 +357,24 @@ class IcebergConverter(spark: SparkSession)
override def loadLastDeltaVersionConverted(
snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] =
recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") {
catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong)
IcebergConverter.getLastConvertedDeltaVersion(loadIcebergTable(snapshot, catalogTable))
}

protected def loadIcebergTable(
snapshot: Snapshot, catalogTable: CatalogTable): Option[IcebergTable] = {
recordFrameProfile("Delta", "IcebergConverter.loadLastConvertedIcebergTable") {
val hiveCatalog = IcebergTransactionUtils
.createHiveCatalog(snapshot.deltaLog.newDeltaHadoopConf())
val icebergTableId = IcebergTransactionUtils
.convertSparkTableIdentifierToIcebergHive(catalogTable.identifier)
if (hiveCatalog.tableExists(icebergTableId)) {
Some(hiveCatalog.loadTable(icebergTableId))
} else {
None
}
}
}

/**
* Build an iceberg TransactionHelper from the provided txn, and commit the set of changes
* specified by the actionsToCommit.
Expand Down

0 comments on commit be9718d

Please sign in to comment.