diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index 7745f9e55f6..df4cfb20465 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.icebergShaded +import java.util.ConcurrentModificationException + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -29,7 +31,6 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} -import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables import shadedForDelta.org.apache.iceberg.mapping.MappingUtil import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser @@ -48,12 +49,15 @@ case object REPLACE_TABLE extends IcebergTableOp * @param conf Configuration for Iceberg Hadoop interactions. * @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. * @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. + * @param lastConvertedIcebergSnapshotId the iceberg snapshot this Iceberg txn should write to. + * @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from. */ class IcebergConversionTransaction( protected val catalogTable: CatalogTable, protected val conf: Configuration, protected val postCommitSnapshot: Snapshot, protected val tableOp: IcebergTableOp = WRITE_TABLE, + protected val lastConvertedIcebergSnapshotId: Option[Long] = None, protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging { /////////////////////////// @@ -197,7 +201,7 @@ class IcebergConversionTransaction( DeltaFileProviderUtils.createJsonStatsParser(postCommitSnapshot.statsSchema) /** Visible for testing. */ - private[icebergShaded]val txn = createIcebergTxn() + private[icebergShaded]val (txn, startFromSnapshotId) = withStartSnapshotId(createIcebergTxn()) /** Tracks if this transaction has already committed. You can only commit once. */ private var committed = false @@ -320,6 +324,25 @@ class IcebergConversionTransaction( .set(IcebergConverter.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping) .commit() + // We ensure the iceberg txns are serializable by only allowing them to commit against + // lastConvertedIcebergSnapshotId. + // + // If the startFromSnapshotId is non-empty and not the same as lastConvertedIcebergSnapshotId, + // there is a new iceberg transaction committed after we read lastConvertedIcebergSnapshotId, + // and before this check. We explicitly abort by throwing exceptions. + // + // If startFromSnapshotId is empty, the txn must be one of the following: + // 1. CREATE_TABLE + // 2. Writing to an empty table + // 3. REPLACE_TABLE + // In either case this txn is safe to commit. + // + // Iceberg will further guarantee that txns passed this check are serializable. + if (startFromSnapshotId.isDefined && lastConvertedIcebergSnapshotId != startFromSnapshotId) { + throw new ConcurrentModificationException("Cannot commit because the converted " + + s"metadata is based on a stale iceberg snapshot $lastConvertedIcebergSnapshotId" + ) + } try { txn.commitTransaction() if (tableOp == CREATE_TABLE) { @@ -399,6 +422,16 @@ class IcebergConversionTransaction( // Helper Methods // //////////////////// + /** + * We fetch the txn table's current snapshot id before any writing is made on the transaction. + * This id should equal [[lastConvertedIcebergSnapshotId]] for the transaction to commit. + * + * @param txn the iceberg transaction + * @return txn and the snapshot id just before this txn + */ + private def withStartSnapshotId(txn: IcebergTransaction): (IcebergTransaction, Option[Long]) = + (txn, Option(txn.table().currentSnapshot()).map(_.snapshotId())) + private def recordIcebergCommit(errorOpt: Option[Throwable] = None): Unit = { val icebergTxnTypes = if (fileUpdates.nonEmpty) Map("icebergTxnTypes" -> fileUpdates.map(_.opType)) else Map.empty diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index 04fa23cf88f..3b5ef1464ee 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path +import shadedForDelta.org.apache.iceberg.{Table => IcebergTable} import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations} import org.apache.spark.sql.SparkSession @@ -51,6 +52,9 @@ object IcebergConverter { val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" val ICEBERG_NAME_MAPPING_PROPERTY = "schema.name-mapping.default" + + def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] = + table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong) } /** @@ -235,13 +239,16 @@ class IcebergConverter(spark: SparkSession) catalogTable: CatalogTable): Option[(Long, Long)] = recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") { val log = snapshotToConvert.deltaLog - val lastDeltaVersionConverted: Option[Long] = - loadLastDeltaVersionConverted(snapshotToConvert, catalogTable) + val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable) + val lastConvertedIcebergSnapshotId = + lastConvertedIcebergTable.flatMap(it => Option(it.currentSnapshot())).map(_.snapshotId()) + val lastDeltaVersionConverted = IcebergConverter + .getLastConvertedDeltaVersion(lastConvertedIcebergTable) val maxCommitsToConvert = spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT) // Nth to convert - if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) { + if (lastDeltaVersionConverted.contains(snapshotToConvert.version)) { return None } @@ -276,7 +283,8 @@ class IcebergConverter(spark: SparkSession) } val icebergTxn = new IcebergConversionTransaction( - catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted) + catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, + lastConvertedIcebergSnapshotId, lastDeltaVersionConverted) // Write out the actions taken since the last conversion (or since table creation). // This is done in batches, with each batch corresponding either to one delta file, @@ -349,9 +357,24 @@ class IcebergConverter(spark: SparkSession) override def loadLastDeltaVersionConverted( snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] = recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") { - catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong) + IcebergConverter.getLastConvertedDeltaVersion(loadIcebergTable(snapshot, catalogTable)) } + protected def loadIcebergTable( + snapshot: Snapshot, catalogTable: CatalogTable): Option[IcebergTable] = { + recordFrameProfile("Delta", "IcebergConverter.loadLastConvertedIcebergTable") { + val hiveCatalog = IcebergTransactionUtils + .createHiveCatalog(snapshot.deltaLog.newDeltaHadoopConf()) + val icebergTableId = IcebergTransactionUtils + .convertSparkTableIdentifierToIcebergHive(catalogTable.identifier) + if (hiveCatalog.tableExists(icebergTableId)) { + Some(hiveCatalog.loadTable(icebergTableId)) + } else { + None + } + } + } + /** * Build an iceberg TransactionHelper from the provided txn, and commit the set of changes * specified by the actionsToCommit.