Skip to content

Commit

Permalink
[Spark] Enable UniForm Without Rewrite (#3379)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Now users could directly enable Delta UniForm using `ALTER TABLE SET
TBLPROPERTIES` command, this only converts the corresponding metadata
from `delta` to `iceberg` without rewriting all the underlying parquet
files.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Through manual tests and e2e tests.

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
Yes, this PR let users enable Delta UniForm directly via `ALTER TABLE
SET TBLPROPERTIES` command.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
xzhseh authored Jul 16, 2024
1 parent 573a57f commit 7b35259
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 46 deletions.
4 changes: 2 additions & 2 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@
},
"DELETION_VECTORS_SHOULD_BE_DISABLED" : {
"message" : [
"IcebergCompatV<version> requires Deletion Vectors to be disabled on the table. Please use the ALTER TABLE DROP FEATURE command to disable Deletion Vectors and to remove the existing Deletion Vectors from the table."
"IcebergCompatV<version> requires Deletion Vectors to be disabled on the table first. Then run REORG PURGE command to purge the Deletion Vectors on the table."
]
},
"DISABLING_REQUIRED_TABLE_FEATURE" : {
Expand Down Expand Up @@ -1152,7 +1152,7 @@
},
"VERSION_MUTUAL_EXCLUSIVE" : {
"message" : [
"Only one IcebergCompat version can be enabled."
"Only one IcebergCompat version can be enabled, please explicitly disable all other IcebergCompat versions that are not needed."
]
},
"WRONG_REQUIRED_TABLE_PROPERTY" : {
Expand Down
90 changes: 53 additions & 37 deletions spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ object IcebergCompatV1 extends IcebergCompat(
CheckAddFileHasStats,
CheckNoPartitionEvolution,
CheckNoListMapNullType,
CheckNoDeletionVector,
CheckVersionChangeNeedsRewrite)
CheckDeletionVectorDisabled
)
)

object IcebergCompatV2 extends IcebergCompat(
Expand All @@ -61,8 +61,8 @@ object IcebergCompatV2 extends IcebergCompat(
CheckAddFileHasStats,
CheckTypeInV2AllowList,
CheckNoPartitionEvolution,
CheckNoDeletionVector,
CheckVersionChangeNeedsRewrite)
CheckDeletionVectorDisabled
)
)

/**
Expand Down Expand Up @@ -106,14 +106,16 @@ case class IcebergCompat(
prevSnapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
isCreatingOrReorgTable: Boolean,
operation: Option[DeltaOperations.Operation],
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
val prevProtocol = prevSnapshot.protocol
val prevMetadata = prevSnapshot.metadata
val wasEnabled = this.isEnabled(prevMetadata)
val isEnabled = this.isEnabled(newestMetadata)
val tableId = newestMetadata.id

val isCreatingOrReorgTable = UniversalFormat.isCreatingOrReorgTable(operation)

(wasEnabled, isEnabled) match {
case (_, false) => (None, None) // not enable or disabling, Ignore
case (_, true) => // Enabling now or already-enabled
Expand Down Expand Up @@ -186,10 +188,15 @@ case class IcebergCompat(
} else None

// Apply additional checks
val context = IcebergCompatContext(prevSnapshot,
val context = IcebergCompatContext(
prevSnapshot,
protocolResult.getOrElse(newestProtocol),
metadataResult.getOrElse(newestMetadata),
isCreatingOrReorgTable, actions, tableId, version)
operation,
actions,
tableId,
version
)
checks.foreach(_.apply(context))

(protocolResult, metadataResult)
Expand Down Expand Up @@ -241,7 +248,10 @@ object IcebergCompat extends DeltaLogging {
* @return true if the target version is enabled on the table.
*/
def isVersionEnabled(metadata: Metadata, version: Integer): Boolean =
knownVersions.exists{ case (_, v) => v == version }
knownVersions.exists {
case (config, v) =>
(v == version) && (config.fromMetaData(metadata).getOrElse(false))
}
}

/**
Expand Down Expand Up @@ -293,7 +303,7 @@ case class IcebergCompatContext(
prevSnapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
isCreatingOrReorgTable: Boolean,
operation: Option[DeltaOperations.Operation],
actions: Seq[Action],
tableId: String,
version: Integer) {
Expand All @@ -310,9 +320,9 @@ trait IcebergCompatCheck extends (IcebergCompatContext => Unit)
object CheckOnlySingleVersionEnabled extends IcebergCompatCheck {
override def apply(context: IcebergCompatContext): Unit = {
val numEnabled = IcebergCompat.knownVersions
.map{ case (config, _) =>
if (config.fromMetaData(context.newestMetadata).getOrElse(false)) 1 else 0 }
.sum
.map { case (config, _) =>
if (config.fromMetaData(context.newestMetadata).getOrElse(false)) 1 else 0
}.sum
if (numEnabled > 1) {
throw DeltaErrors.icebergCompatVersionMutualExclusive(context.version)
}
Expand Down Expand Up @@ -388,34 +398,40 @@ object CheckTypeInV2AllowList extends IcebergCompatCheck {
}
}

object CheckNoDeletionVector extends IcebergCompatCheck {

override def apply(context: IcebergCompatContext): Unit = {
// Check for incompatible table features;
// Deletion Vectors cannot be writeable; Note that concurrent txns are also covered
// to NOT write deletion vectors as that txn would need to make DVs writable, which
// would conflict with current txn because of metadata change.
if (DeletionVectorUtils.deletionVectorsWritable(
context.newestProtocol, context.newestMetadata)) {
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
}
}
}


/**
* Check if change IcebergCompat version needs a REORG operation
* Check if the deletion vector has been disabled by previous snapshot
* or newest metadata and protocol depending on whether the operation
* is REORG UPGRADE UNIFORM or not.
*/
object CheckVersionChangeNeedsRewrite extends IcebergCompatCheck {

private val versionChangesWithoutRewrite: Map[Int, Set[Int]] =
Map(0 -> Set(0, 1), 1 -> Set(0, 1), 2 -> Set(0, 1, 2))
object CheckDeletionVectorDisabled extends IcebergCompatCheck {
override def apply(context: IcebergCompatContext): Unit = {
if (!context.isCreatingOrReorgTable) {
val oldVersion = IcebergCompat.getEnabledVersion(context.prevMetadata).getOrElse(0)
val allowedChanges = versionChangesWithoutRewrite.getOrElse(oldVersion, Set.empty[Int])
if (!allowedChanges.contains(context.version)) {
throw DeltaErrors.icebergCompatChangeVersionNeedRewrite(oldVersion, context.version)
if (context.newestProtocol.isFeatureSupported(DeletionVectorsTableFeature)) {
// note: user will need to *separately* disable deletion vectors if this check fails,
// i.e., ALTER TABLE SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');
val isReorgUpgradeUniform = UniversalFormat.isReorgUpgradeUniform(context.operation)
// for REORG UPGRADE UNIFORM, we only need to check whether DV
// is enabled in the newest metadata and protocol, this conforms with
// the semantics of REORG UPGRADE UNIFORM, which will automatically disable
// DV and rewrite all the parquet files with DV removed as for now.
if (isReorgUpgradeUniform) {
if (DeletionVectorUtils.deletionVectorsWritable(
protocol = context.newestProtocol,
metadata = context.newestMetadata
)) {
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
}
} else {
// for other commands, we need to check whether DV is disabled from the
// previous snapshot, in case there are concurrent writers.
// plus, we also need to check from the newest metadata and protocol,
// in case we are creating a new uniform table with DV enabled.
if (DeletionVectorUtils.deletionVectorsWritable(context.prevSnapshot) ||
DeletionVectorUtils.deletionVectorsWritable(
protocol = context.newestProtocol,
metadata = context.newestMetadata
)) {
throw DeltaErrors.icebergCompatDeletionVectorsShouldBeDisabledException(context.version)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
snapshot,
newestProtocol = protocol, // Note: this will try to use `newProtocol`
newestMetadata = metadata, // Note: this will try to use `newMetadata`
isCreatingNewTable || op.isInstanceOf[DeltaOperations.UpgradeUniformProperties],
Some(op),
otherActions
)
newProtocol = protocolUpdate1.orElse(newProtocol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ object UniversalFormat extends DeltaLogging {
val HUDI_FORMAT = "hudi"
val SUPPORTED_FORMATS = Set(HUDI_FORMAT, ICEBERG_FORMAT)

/**
* Check if the operation is CREATE/REPLACE TABLE or REORG UPGRADE UNIFORM commands.
*
* @param op the delta operation to be checked.
* @return whether the operation is create or reorg.
*/
def isCreatingOrReorgTable(op: Option[DeltaOperations.Operation]): Boolean = op match {
case Some(_: DeltaOperations.CreateTable) |
Some(_: DeltaOperations.UpgradeUniformProperties) |
// REPLACE TABLE is also considered creating table to preserve the
// the semantics for `isCreatingNewTable` in `OptimisticTransaction`.
Some(_: DeltaOperations.ReplaceTable) =>
true
// this is to conform with the semantics in `enforceDependenciesInConfiguration`
case None => true
case _ => false
}

/**
* Check if the operation is REORG UPGRADE UNIFORM command.
*
* @param op the delta operation to be checked.
* @return whether the operation is REORG UPGRADE UNIFORM.
*/
def isReorgUpgradeUniform(op: Option[DeltaOperations.Operation]): Boolean = op match {
case Some(_: DeltaOperations.UpgradeUniformProperties) => true
case _ => false
}

def icebergEnabled(metadata: Metadata): Boolean = {
DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetaData(metadata).contains(ICEBERG_FORMAT)
}
Expand Down Expand Up @@ -83,11 +112,11 @@ object UniversalFormat extends DeltaLogging {
snapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
isCreatingOrReorgTable: Boolean,
operation: Option[DeltaOperations.Operation],
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
enforceHudiDependencies(newestMetadata, snapshot)
enforceIcebergInvariantsAndDependencies(
snapshot, newestProtocol, newestMetadata, isCreatingOrReorgTable, actions)
snapshot, newestProtocol, newestMetadata, operation, actions)
}

/**
Expand Down Expand Up @@ -125,7 +154,7 @@ object UniversalFormat extends DeltaLogging {
snapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
isCreatingOrReorg: Boolean,
operation: Option[DeltaOperations.Operation],
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {

val prevMetadata = snapshot.metadata
Expand Down Expand Up @@ -174,7 +203,7 @@ object UniversalFormat extends DeltaLogging {
snapshot,
newestProtocol = protocolToCheck,
newestMetadata = metadataToCheck,
isCreatingOrReorg,
operation,
actions
)
protocolToCheck = v1protocolUpdate.getOrElse(protocolToCheck)
Expand All @@ -185,7 +214,7 @@ object UniversalFormat extends DeltaLogging {
snapshot,
newestProtocol = protocolToCheck,
newestMetadata = metadataToCheck,
isCreatingOrReorg,
operation,
actions
)
changed ||= v2protocolUpdate.nonEmpty || v2metadataUpdate.nonEmpty
Expand Down Expand Up @@ -218,7 +247,7 @@ object UniversalFormat extends DeltaLogging {
snapshot,
newestProtocol = snapshot.protocol,
newestMetadata = metadata,
isCreatingOrReorgTable = true,
operation = None,
actions = Seq()
)

Expand Down Expand Up @@ -257,6 +286,7 @@ object UniversalFormat extends DeltaLogging {
}
}
}

/** Class to facilitate the conversion of Delta into other table formats. */
abstract class UniversalFormatConverter(spark: SparkSession) {
/**
Expand Down

0 comments on commit 7b35259

Please sign in to comment.