Skip to content

Commit

Permalink
[Spark][3.2] Fix replacing clustered table with non-clustered table (#…
Browse files Browse the repository at this point in the history
…3195)

## Description
Fix replacing clustered table with non-clustered table, by creating a
domain metadata with empty clustering columns.

Cherry-picked from 59f8c64

## How was this patch tested?
New unit tests
  • Loading branch information
zedtang authored Jun 4, 2024
1 parent d62f6b2 commit 9081a4c
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ case class CreateDeltaTableCommand(
// exists (replacing table), otherwise it is handled inside WriteIntoDelta (creating table).
if (!isV1Writer && isReplace && txn.readVersion > -1L) {
val newDomainMetadata = Seq.empty[DomainMetadata] ++
ClusteredTableUtils.getDomainMetadataOptional(table, txn)
ClusteredTableUtils.getDomainMetadataFromTransaction(
ClusteredTableUtils.getClusterBySpecOptional(table), txn)
// Ensure to remove any domain metadata for REPLACE TABLE.
val newActions = taggedCommitData.actions ++
DomainMetadataUtils.handleDomainMetadataForReplaceTable(
Expand Down Expand Up @@ -337,7 +338,8 @@ case class CreateDeltaTableCommand(
protocol.foreach { protocol =>
txn.updateProtocol(protocol)
}
ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq
ClusteredTableUtils.getDomainMetadataFromTransaction(
ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq
} else {
verifyTableMetadata(txn, tableWithLocation)
Nil
Expand Down Expand Up @@ -381,7 +383,8 @@ case class CreateDeltaTableCommand(
actionsToCommit = removes ++
DomainMetadataUtils.handleDomainMetadataForReplaceTable(
txn.snapshot.domainMetadata,
ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq)
ClusteredTableUtils.getDomainMetadataFromTransaction(
ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq)
actionsToCommit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ trait ImplicitMetadataOperation extends DeltaLogging {
clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[DomainMetadata] = {
if (canUpdateMetadata && (!txn.deltaLog.tableExists || isReplacingTable)) {
val newDomainMetadata = Seq.empty[DomainMetadata] ++
ClusteredTableUtils.getDomainMetadataOptional(clusterBySpecOpt, txn)
ClusteredTableUtils.getDomainMetadataFromTransaction(clusterBySpecOpt, txn)
if (!txn.deltaLog.tableExists) {
newDomainMetadata
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,29 @@ trait ClusteredTableUtilsBase extends DeltaLogging {
}

/**
* Create an optional [[DomainMetadata]] action to store clustering columns.
* Returns [[DomainMetadata]] action to store clustering columns.
* If clusterBySpecOpt is not empty (clustering columns are specified by CLUSTER BY), it creates
* the domain metadata based on the clustering columns.
* Otherwise (CLUSTER BY is not specified for REPLACE TABLE), it creates the domain metadata
* with empty clustering columns if a clustering domain exists.
*
* This is used for CREATE TABLE and REPLACE TABLE.
*/
def getDomainMetadataOptional(
def getDomainMetadataFromTransaction(
clusterBySpecOpt: Option[ClusterBySpec],
txn: OptimisticTransaction): Option[DomainMetadata] = {
txn: OptimisticTransaction): Seq[DomainMetadata] = {
clusterBySpecOpt.map { clusterBy =>
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(
txn.protocol, txn.metadata, clusterBy)
val clusteringColumns =
clusterBy.columnNames.map(_.toString).map(ClusteringColumn(txn.metadata.schema, _))
createDomainMetadata(clusteringColumns)
Some(createDomainMetadata(clusteringColumns)).toSeq
}.getOrElse {
if (txn.snapshot.domainMetadata.exists(_.domain == ClusteringMetadataDomain.domainName)) {
Some(createDomainMetadata(Seq.empty)).toSeq
} else {
None.toSeq
}
}
}

Expand All @@ -151,15 +163,6 @@ trait ClusteredTableUtilsBase extends DeltaLogging {
ClusteringMetadataDomain.fromClusteringColumns(clusteringColumns).toDomainMetadata
}

/**
* Create a [[ClusteringMetadataDomain]] with the given CatalogTable's clustering column property.
*/
def getDomainMetadataOptional(
table: CatalogTable,
txn: OptimisticTransaction): Option[DomainMetadata] = {
getDomainMetadataOptional(getClusterBySpecOptional(table), txn)
}

/**
* Extract [[ClusteringColumn]]s from a given snapshot. Return None if the clustering domain
* metadata is missing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
}
val expectedClusteringColumns = logicalColumnNames.map(ClusteringColumn(snapshot.schema, _))
val actualClusteringColumns =
ClusteredTableUtils.getClusteringColumnsOptional(snapshot).getOrElse(Seq.empty)
ClusteredTableUtils.getClusteringColumnsOptional(snapshot).orNull
assert(expectedClusteringColumns == actualClusteringColumns)
}

Expand Down Expand Up @@ -152,6 +152,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
} else {
assertClusterByNotExist()
}
case "WRITE" =>
doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
case _ =>
// Other operations are not tested yet. If the test fails here, please check the expected
// behavior and add the operation to the appropriate case.
Expand Down Expand Up @@ -212,7 +214,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession

def verifyClusteringColumns(
tableIdentifier: TableIdentifier,
expectedLogicalClusteringColumns: String): Unit = {
expectedLogicalClusteringColumns: String
): Unit = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
verifyClusteringColumnsInternal(
snapshot,
Expand All @@ -235,12 +238,15 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
def verifyClusteringColumnsInternal(
snapshot: Snapshot,
tableNameOrPath: String,
expectedLogicalClusteringColumns: String): Unit = {
expectedLogicalClusteringColumns: String
): Unit = {
assert(ClusteredTableUtils.isSupported(snapshot.protocol) === true)
verifyClusteringColumnsInDomainMetadata(snapshot, expectedLogicalClusteringColumns)

// Verify Delta history operation parameters' clusterBy
verifyDescribeHistoryOperationParameters(tableNameOrPath)
verifyDescribeHistoryOperationParameters(
tableNameOrPath
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,66 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase
}
}

test("Replace clustered table with non-clustered table") {
import testImplicits._
withTable(sourceTable) {
sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta")
spark.range(1000)
.map(i => (i.intValue(), "string col"))
.toDF("i", "s")
.write
.format("delta")
.mode("append")
.saveAsTable(sourceTable)

// Validate REPLACE TABLE (AS SELECT).
Seq("REPLACE", "CREATE OR REPLACE").foreach { clause =>
withClusteredTable(testTable, "a int", "a") {
verifyClusteringColumns(TableIdentifier(testTable), "a")

Seq(true, false).foreach { isRTAS =>
val testQuery = if (isRTAS) {
s"$clause TABLE $testTable USING delta AS SELECT * FROM $sourceTable"
} else {
sql(s"$clause TABLE $testTable (i int, s string) USING delta")
s"INSERT INTO $testTable SELECT * FROM $sourceTable"
}
sql(testQuery)
// Note that clustering table feature are still retained after REPLACE TABLE.
verifyClusteringColumns(TableIdentifier(testTable), "")
}
}
}
}
}

test("Replace clustered table with non-clustered table - dataframe writer") {
import testImplicits._
withTable(sourceTable) {
sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta")
spark.range(1000)
.map(i => (i.intValue(), "string col"))
.toDF("i", "s")
.write
.format("delta")
.mode("append")
.saveAsTable(sourceTable)

withClusteredTable(testTable, "a int", "a") {
verifyClusteringColumns(TableIdentifier(testTable), "a")

spark.table(sourceTable)
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(testTable)
// Note that clustering table feature are still retained after REPLACE TABLE.
verifyClusteringColumns(TableIdentifier(testTable), "")
}
}
}

protected def withTempDirIfNecessary(f: Option[String] => Unit): Unit = {
if (isPathBased) {
withTempDir { dir =>
Expand Down

0 comments on commit 9081a4c

Please sign in to comment.