Skip to content

Commit 33debec

Browse files
committed
[Spark] Fix replacing clustered table with non-clustered table (delta-io#3175)
## Description Fix replacing clustered table with non-clustered table, by creating a domain metadata with empty clustering columns. ## How was this patch tested? New UTs.
1 parent d62f6b2 commit 33debec

File tree

5 files changed

+93
-21
lines changed

5 files changed

+93
-21
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ case class CreateDeltaTableCommand(
255255
// exists (replacing table), otherwise it is handled inside WriteIntoDelta (creating table).
256256
if (!isV1Writer && isReplace && txn.readVersion > -1L) {
257257
val newDomainMetadata = Seq.empty[DomainMetadata] ++
258-
ClusteredTableUtils.getDomainMetadataOptional(table, txn)
258+
ClusteredTableUtils.getDomainMetadataFromTransaction(
259+
ClusteredTableUtils.getClusterBySpecOptional(table), txn)
259260
// Ensure to remove any domain metadata for REPLACE TABLE.
260261
val newActions = taggedCommitData.actions ++
261262
DomainMetadataUtils.handleDomainMetadataForReplaceTable(
@@ -337,7 +338,8 @@ case class CreateDeltaTableCommand(
337338
protocol.foreach { protocol =>
338339
txn.updateProtocol(protocol)
339340
}
340-
ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq
341+
ClusteredTableUtils.getDomainMetadataFromTransaction(
342+
ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq
341343
} else {
342344
verifyTableMetadata(txn, tableWithLocation)
343345
Nil
@@ -381,7 +383,8 @@ case class CreateDeltaTableCommand(
381383
actionsToCommit = removes ++
382384
DomainMetadataUtils.handleDomainMetadataForReplaceTable(
383385
txn.snapshot.domainMetadata,
384-
ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq)
386+
ClusteredTableUtils.getDomainMetadataFromTransaction(
387+
ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq)
385388
actionsToCommit
386389
}
387390

spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ trait ImplicitMetadataOperation extends DeltaLogging {
181181
clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[DomainMetadata] = {
182182
if (canUpdateMetadata && (!txn.deltaLog.tableExists || isReplacingTable)) {
183183
val newDomainMetadata = Seq.empty[DomainMetadata] ++
184-
ClusteredTableUtils.getDomainMetadataOptional(clusterBySpecOpt, txn)
184+
ClusteredTableUtils.getDomainMetadataFromTransaction(clusterBySpecOpt, txn)
185185
if (!txn.deltaLog.tableExists) {
186186
newDomainMetadata
187187
} else {

spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala

+16-13
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,29 @@ trait ClusteredTableUtilsBase extends DeltaLogging {
130130
}
131131

132132
/**
133-
* Create an optional [[DomainMetadata]] action to store clustering columns.
133+
* Returns [[DomainMetadata]] action to store clustering columns.
134+
* If clusterBySpecOpt is not empty (clustering columns are specified by CLUSTER BY), it creates
135+
* the domain metadata based on the clustering columns.
136+
* Otherwise (CLUSTER BY is not specified for REPLACE TABLE), it creates the domain metadata
137+
* with empty clustering columns if a clustering domain exists.
138+
*
139+
* This is used for CREATE TABLE and REPLACE TABLE.
134140
*/
135-
def getDomainMetadataOptional(
141+
def getDomainMetadataFromTransaction(
136142
clusterBySpecOpt: Option[ClusterBySpec],
137-
txn: OptimisticTransaction): Option[DomainMetadata] = {
143+
txn: OptimisticTransaction): Seq[DomainMetadata] = {
138144
clusterBySpecOpt.map { clusterBy =>
139145
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(
140146
txn.protocol, txn.metadata, clusterBy)
141147
val clusteringColumns =
142148
clusterBy.columnNames.map(_.toString).map(ClusteringColumn(txn.metadata.schema, _))
143-
createDomainMetadata(clusteringColumns)
149+
Some(createDomainMetadata(clusteringColumns)).toSeq
150+
}.getOrElse {
151+
if (txn.snapshot.domainMetadata.exists(_.domain == ClusteringMetadataDomain.domainName)) {
152+
Some(createDomainMetadata(Seq.empty)).toSeq
153+
} else {
154+
None.toSeq
155+
}
144156
}
145157
}
146158

@@ -151,15 +163,6 @@ trait ClusteredTableUtilsBase extends DeltaLogging {
151163
ClusteringMetadataDomain.fromClusteringColumns(clusteringColumns).toDomainMetadata
152164
}
153165

154-
/**
155-
* Create a [[ClusteringMetadataDomain]] with the given CatalogTable's clustering column property.
156-
*/
157-
def getDomainMetadataOptional(
158-
table: CatalogTable,
159-
txn: OptimisticTransaction): Option[DomainMetadata] = {
160-
getDomainMetadataOptional(getClusterBySpecOptional(table), txn)
161-
}
162-
163166
/**
164167
* Extract [[ClusteringColumn]]s from a given snapshot. Return None if the clustering domain
165168
* metadata is missing.

spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala

+10-4
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
6161
}
6262
val expectedClusteringColumns = logicalColumnNames.map(ClusteringColumn(snapshot.schema, _))
6363
val actualClusteringColumns =
64-
ClusteredTableUtils.getClusteringColumnsOptional(snapshot).getOrElse(Seq.empty)
64+
ClusteredTableUtils.getClusteringColumnsOptional(snapshot).orNull
6565
assert(expectedClusteringColumns == actualClusteringColumns)
6666
}
6767

@@ -152,6 +152,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
152152
} else {
153153
assertClusterByNotExist()
154154
}
155+
case "WRITE" =>
156+
doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
155157
case _ =>
156158
// Other operations are not tested yet. If the test fails here, please check the expected
157159
// behavior and add the operation to the appropriate case.
@@ -212,7 +214,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
212214

213215
def verifyClusteringColumns(
214216
tableIdentifier: TableIdentifier,
215-
expectedLogicalClusteringColumns: String): Unit = {
217+
expectedLogicalClusteringColumns: String
218+
): Unit = {
216219
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
217220
verifyClusteringColumnsInternal(
218221
snapshot,
@@ -235,12 +238,15 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
235238
def verifyClusteringColumnsInternal(
236239
snapshot: Snapshot,
237240
tableNameOrPath: String,
238-
expectedLogicalClusteringColumns: String): Unit = {
241+
expectedLogicalClusteringColumns: String
242+
): Unit = {
239243
assert(ClusteredTableUtils.isSupported(snapshot.protocol) === true)
240244
verifyClusteringColumnsInDomainMetadata(snapshot, expectedLogicalClusteringColumns)
241245

242246
// Verify Delta history operation parameters' clusterBy
243-
verifyDescribeHistoryOperationParameters(tableNameOrPath)
247+
verifyDescribeHistoryOperationParameters(
248+
tableNameOrPath
249+
)
244250
}
245251
}
246252

spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala

+60
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,66 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase
398398
}
399399
}
400400

401+
test("Replace clustered table with non-clustered table") {
402+
import testImplicits._
403+
withTable(sourceTable) {
404+
sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta")
405+
spark.range(1000)
406+
.map(i => (i.intValue(), "string col"))
407+
.toDF("i", "s")
408+
.write
409+
.format("delta")
410+
.mode("append")
411+
.saveAsTable(sourceTable)
412+
413+
// Validate REPLACE TABLE (AS SELECT).
414+
Seq("REPLACE", "CREATE OR REPLACE").foreach { clause =>
415+
withClusteredTable(testTable, "a int", "a") {
416+
verifyClusteringColumns(TableIdentifier(testTable), "a")
417+
418+
Seq(true, false).foreach { isRTAS =>
419+
val testQuery = if (isRTAS) {
420+
s"$clause TABLE $testTable USING delta AS SELECT * FROM $sourceTable"
421+
} else {
422+
sql(s"$clause TABLE $testTable (i int, s string) USING delta")
423+
s"INSERT INTO $testTable SELECT * FROM $sourceTable"
424+
}
425+
sql(testQuery)
426+
// Note that clustering table feature are still retained after REPLACE TABLE.
427+
verifyClusteringColumns(TableIdentifier(testTable), "")
428+
}
429+
}
430+
}
431+
}
432+
}
433+
434+
test("Replace clustered table with non-clustered table - dataframe writer") {
435+
import testImplicits._
436+
withTable(sourceTable) {
437+
sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta")
438+
spark.range(1000)
439+
.map(i => (i.intValue(), "string col"))
440+
.toDF("i", "s")
441+
.write
442+
.format("delta")
443+
.mode("append")
444+
.saveAsTable(sourceTable)
445+
446+
withClusteredTable(testTable, "a int", "a") {
447+
verifyClusteringColumns(TableIdentifier(testTable), "a")
448+
449+
spark.table(sourceTable)
450+
.write
451+
.format("delta")
452+
.mode("overwrite")
453+
.option("overwriteSchema", "true")
454+
.saveAsTable(testTable)
455+
// Note that clustering table feature are still retained after REPLACE TABLE.
456+
verifyClusteringColumns(TableIdentifier(testTable), "")
457+
}
458+
}
459+
}
460+
401461
protected def withTempDirIfNecessary(f: Option[String] => Unit): Unit = {
402462
if (isPathBased) {
403463
withTempDir { dir =>

0 commit comments

Comments
 (0)