diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 191089640fc..f3ea5c279e5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -255,7 +255,8 @@ case class CreateDeltaTableCommand( // exists (replacing table), otherwise it is handled inside WriteIntoDelta (creating table). if (!isV1Writer && isReplace && txn.readVersion > -1L) { val newDomainMetadata = Seq.empty[DomainMetadata] ++ - ClusteredTableUtils.getDomainMetadataOptional(table, txn) + ClusteredTableUtils.getDomainMetadataFromTransaction( + ClusteredTableUtils.getClusterBySpecOptional(table), txn) // Ensure to remove any domain metadata for REPLACE TABLE. val newActions = taggedCommitData.actions ++ DomainMetadataUtils.handleDomainMetadataForReplaceTable( @@ -337,7 +338,8 @@ case class CreateDeltaTableCommand( protocol.foreach { protocol => txn.updateProtocol(protocol) } - ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq + ClusteredTableUtils.getDomainMetadataFromTransaction( + ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq } else { verifyTableMetadata(txn, tableWithLocation) Nil @@ -381,7 +383,8 @@ case class CreateDeltaTableCommand( actionsToCommit = removes ++ DomainMetadataUtils.handleDomainMetadataForReplaceTable( txn.snapshot.domainMetadata, - ClusteredTableUtils.getDomainMetadataOptional(table, txn).toSeq) + ClusteredTableUtils.getDomainMetadataFromTransaction( + ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq) actionsToCommit } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 64e62179ec5..5934cb9d3af 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -181,7 +181,7 @@ trait ImplicitMetadataOperation extends DeltaLogging { clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[DomainMetadata] = { if (canUpdateMetadata && (!txn.deltaLog.tableExists || isReplacingTable)) { val newDomainMetadata = Seq.empty[DomainMetadata] ++ - ClusteredTableUtils.getDomainMetadataOptional(clusterBySpecOpt, txn) + ClusteredTableUtils.getDomainMetadataFromTransaction(clusterBySpecOpt, txn) if (!txn.deltaLog.tableExists) { newDomainMetadata } else { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala index 49ee836a2af..fe3e8b12633 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableUtils.scala @@ -130,17 +130,29 @@ trait ClusteredTableUtilsBase extends DeltaLogging { } /** - * Create an optional [[DomainMetadata]] action to store clustering columns. + * Returns [[DomainMetadata]] action to store clustering columns. + * If clusterBySpecOpt is not empty (clustering columns are specified by CLUSTER BY), it creates + * the domain metadata based on the clustering columns. + * Otherwise (CLUSTER BY is not specified for REPLACE TABLE), it creates the domain metadata + * with empty clustering columns if a clustering domain exists. + * + * This is used for CREATE TABLE and REPLACE TABLE. */ - def getDomainMetadataOptional( + def getDomainMetadataFromTransaction( clusterBySpecOpt: Option[ClusterBySpec], - txn: OptimisticTransaction): Option[DomainMetadata] = { + txn: OptimisticTransaction): Seq[DomainMetadata] = { clusterBySpecOpt.map { clusterBy => ClusteredTableUtils.validateClusteringColumnsInStatsSchema( txn.protocol, txn.metadata, clusterBy) val clusteringColumns = clusterBy.columnNames.map(_.toString).map(ClusteringColumn(txn.metadata.schema, _)) - createDomainMetadata(clusteringColumns) + Some(createDomainMetadata(clusteringColumns)).toSeq + }.getOrElse { + if (txn.snapshot.domainMetadata.exists(_.domain == ClusteringMetadataDomain.domainName)) { + Some(createDomainMetadata(Seq.empty)).toSeq + } else { + None.toSeq + } } } @@ -151,15 +163,6 @@ trait ClusteredTableUtilsBase extends DeltaLogging { ClusteringMetadataDomain.fromClusteringColumns(clusteringColumns).toDomainMetadata } - /** - * Create a [[ClusteringMetadataDomain]] with the given CatalogTable's clustering column property. - */ - def getDomainMetadataOptional( - table: CatalogTable, - txn: OptimisticTransaction): Option[DomainMetadata] = { - getDomainMetadataOptional(getClusterBySpecOptional(table), txn) - } - /** * Extract [[ClusteringColumn]]s from a given snapshot. Return None if the clustering domain * metadata is missing. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala index 18ebcba41fa..1a8a7274914 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala @@ -61,7 +61,7 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession } val expectedClusteringColumns = logicalColumnNames.map(ClusteringColumn(snapshot.schema, _)) val actualClusteringColumns = - ClusteredTableUtils.getClusteringColumnsOptional(snapshot).getOrElse(Seq.empty) + ClusteredTableUtils.getClusteringColumnsOptional(snapshot).orNull assert(expectedClusteringColumns == actualClusteringColumns) } @@ -152,6 +152,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession } else { assertClusterByNotExist() } + case "WRITE" => + doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY)) case _ => // Other operations are not tested yet. If the test fails here, please check the expected // behavior and add the operation to the appropriate case. @@ -212,7 +214,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession def verifyClusteringColumns( tableIdentifier: TableIdentifier, - expectedLogicalClusteringColumns: String): Unit = { + expectedLogicalClusteringColumns: String + ): Unit = { val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier) verifyClusteringColumnsInternal( snapshot, @@ -235,12 +238,15 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession def verifyClusteringColumnsInternal( snapshot: Snapshot, tableNameOrPath: String, - expectedLogicalClusteringColumns: String): Unit = { + expectedLogicalClusteringColumns: String + ): Unit = { assert(ClusteredTableUtils.isSupported(snapshot.protocol) === true) verifyClusteringColumnsInDomainMetadata(snapshot, expectedLogicalClusteringColumns) // Verify Delta history operation parameters' clusterBy - verifyDescribeHistoryOperationParameters(tableNameOrPath) + verifyDescribeHistoryOperationParameters( + tableNameOrPath + ) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala index 40b26c4ee87..12021587a1c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala @@ -398,6 +398,66 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase } } + test("Replace clustered table with non-clustered table") { + import testImplicits._ + withTable(sourceTable) { + sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta") + spark.range(1000) + .map(i => (i.intValue(), "string col")) + .toDF("i", "s") + .write + .format("delta") + .mode("append") + .saveAsTable(sourceTable) + + // Validate REPLACE TABLE (AS SELECT). + Seq("REPLACE", "CREATE OR REPLACE").foreach { clause => + withClusteredTable(testTable, "a int", "a") { + verifyClusteringColumns(TableIdentifier(testTable), "a") + + Seq(true, false).foreach { isRTAS => + val testQuery = if (isRTAS) { + s"$clause TABLE $testTable USING delta AS SELECT * FROM $sourceTable" + } else { + sql(s"$clause TABLE $testTable (i int, s string) USING delta") + s"INSERT INTO $testTable SELECT * FROM $sourceTable" + } + sql(testQuery) + // Note that clustering table feature are still retained after REPLACE TABLE. + verifyClusteringColumns(TableIdentifier(testTable), "") + } + } + } + } + } + + test("Replace clustered table with non-clustered table - dataframe writer") { + import testImplicits._ + withTable(sourceTable) { + sql(s"CREATE TABLE $sourceTable(i int, s string) USING delta") + spark.range(1000) + .map(i => (i.intValue(), "string col")) + .toDF("i", "s") + .write + .format("delta") + .mode("append") + .saveAsTable(sourceTable) + + withClusteredTable(testTable, "a int", "a") { + verifyClusteringColumns(TableIdentifier(testTable), "a") + + spark.table(sourceTable) + .write + .format("delta") + .mode("overwrite") + .option("overwriteSchema", "true") + .saveAsTable(testTable) + // Note that clustering table feature are still retained after REPLACE TABLE. + verifyClusteringColumns(TableIdentifier(testTable), "") + } + } + } + protected def withTempDirIfNecessary(f: Option[String] => Unit): Unit = { if (isPathBased) { withTempDir { dir =>