From 456bd53687d408d0ddec6e4e9df4b5732aa398ba Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Wed, 3 May 2023 23:37:46 -0700 Subject: [PATCH 1/2] Remove blocking writes to Delta tables with deletion vectors --- .../sql/delta/OptimisticTransaction.scala | 2 - .../delta/commands/DeletionVectorUtils.scala | 37 ---- .../sql/delta/commands/VacuumCommand.scala | 3 - .../sql/delta/sources/DeltaSQLConf.scala | 9 - ...DisableUpdatesToDvEnabledTablesSuite.scala | 186 ------------------ 5 files changed, 237 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DisableUpdatesToDvEnabledTablesSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index a952afcfdb4..4c46a81549f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -139,7 +139,6 @@ class OptimisticTransaction (implicit override val clock: Clock) extends OptimisticTransactionImpl with DeltaLogging { - DeletionVectorUtils.assertDeletionVectorsNotReadable(spark, snapshot.metadata, snapshot.protocol) /** Creates a new OptimisticTransaction. * @@ -552,7 +551,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite newMetadataTmp = RowId.verifyAndUpdateMetadata( spark, protocol, snapshot.metadata, newMetadataTmp, isCreatingNewTable) - DeletionVectorUtils.assertDeletionVectorsNotEnabled(spark, newMetadataTmp, protocol) assertMetadata(newMetadataTmp) logInfo(s"Updated metadata from ${newMetadata.getOrElse("-")} to $newMetadataTmp") newMetadata = Some(newMetadataTmp) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala index 03c0e276cba..2a784b5c261 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala @@ -85,43 +85,6 @@ trait DeletionVectorUtils { protocol.isFeatureSupported(DeletionVectorsTableFeature) && metadata.format.provider == "parquet" // DVs are only supported on parquet tables. } - - /** - * Utility method that checks the table has no Deletion Vectors enabled. Deletion vectors - * are supported in read-only mode for now. Any updates to tables with deletion vectors - * feature are disabled until we add support. - */ - def assertDeletionVectorsNotReadable( - spark: SparkSession, metadata: Metadata, protocol: Protocol): Unit = { - val disable = - Utils.isTesting && // We are in testing and enabled blocking updates on DV tables - spark.conf.get(DeltaSQLConf.DELTA_ENABLE_BLOCKING_UPDATES_ON_DV_TABLES) - if (!disable && deletionVectorsReadable(protocol, metadata)) { - throw new UnsupportedOperationException( - "Updates to tables with Deletion Vectors feature enabled are not supported in " + - "this version of Delta Lake.") - } - } - - /** - * Utility method that checks the table metadata has no deletion vectors enabled. Deletion vectors - * are supported in read-only mode for now. Any updates to metadata to enable deletion vectors are - * blocked until we add support. - */ - def assertDeletionVectorsNotEnabled( - spark: SparkSession, metadata: Metadata, protocol: Protocol): Unit = { - val disable = - Utils.isTesting && // We are in testing and enabled blocking updates on DV tables - spark.conf.get(DeltaSQLConf.DELTA_ENABLE_BLOCKING_UPDATES_ON_DV_TABLES) - if (!disable && - (protocol.isFeatureSupported(DeletionVectorsTableFeature) || - DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) - ) - ) { - throw new UnsupportedOperationException( - "Enabling Deletion Vectors on the table is not supported in this version of Delta Lake.") - } - } } // To access utilities from places where mixing in a trait is inconvenient. diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index f9a7495b7c1..d9bb8dc543c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -114,9 +114,6 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { require(snapshot.version >= 0, "No state defined for this table. Is this really " + "a Delta table? Refusing to garbage collect.") - DeletionVectorUtils.assertDeletionVectorsNotReadable( - spark, snapshot.metadata, snapshot.protocol) - val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata) val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h))) checkRetentionPeriodSafety(spark, retentionMillis, snapshotTombstoneRetentionMillis) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 3e4c9d445c7..7fdc74f7adf 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1157,15 +1157,6 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) - val DELTA_ENABLE_BLOCKING_UPDATES_ON_DV_TABLES = - buildConf("deletionVectors.updates.blocking.enabled") - .internal() - .doc( - """Enable blocking updates on tables with Deletion Vectors - |Only change this for testing!""".stripMargin) - .booleanConf - .createWithDefault(true) - val DELTA_DUPLICATE_ACTION_CHECK_ENABLED = buildConf("duplicateActionCheck.enabled") .internal() diff --git a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DisableUpdatesToDvEnabledTablesSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DisableUpdatesToDvEnabledTablesSuite.scala deleted file mode 100644 index b871bf661bd..00000000000 --- a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DisableUpdatesToDvEnabledTablesSuite.scala +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.delta.deletionvectors - -import java.io.File -import java.lang - -import org.apache.hadoop.fs.Path - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.delta.{DeletionVectorsTestUtils, DeltaTestUtilsForTempViews} -import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite._ -import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN -import org.apache.spark.sql.test.SharedSparkSession - -/** - * Test suite for testing all write commands are disabled on tables with deletion vectors. - * This is a temporary behavior until we properly implement and test write support on - * tables with deletion vectors. - */ -class DisableUpdatesToDvEnabledTablesSuite extends QueryTest - with SharedSparkSession - with DeletionVectorsTestUtils - with DeltaSQLCommandTest - with DeltaTestUtilsForTempViews { - - import io.delta.implicits._ - - test("DELETE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"DELETE FROM $table2WithDVs WHERE value in (2, 5, 7)") - } - } - - test("MERGE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"MERGE INTO $table1WithDVs t USING (SELECT * FROM $table2WithDVs) s " + - s"ON t.value = s.value WHEN MATCHED THEN DELETE ") - } - } - - test("UPDATE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"UPDATE $table2WithDVs SET value = 3 WHERE value > 0") - } - } - - test("INSERT INTO is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"INSERT INTO $table2WithDVs SELECT 200") - } - } - - test("INSERT INTO is blocked on table with DV feature supported, but no DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table4WithDVFeatureSupported)) { _ => - spark.sql(s"INSERT INTO $table4WithDVFeatureSupported SELECT 200L") - } - } - - test("REPLACE OVERWRITE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"INSERT OVERWRITE $table1WithDVs SELECT * FROM $table2WithDVs") - } - } - - test("OPTIMIZE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"OPTIMIZE $table2WithDVs") - } - } - - test("RESTORE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - spark.sql(s"RESTORE $table2WithDVs TO VERSION AS OF 0") - } - } - - for (enableLogging <- BOOLEAN_DOMAIN) - test(s"VACUUM is blocked on table with DVs with logging enabled=$enableLogging") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table1WithDVs)) { _ => - withSQLConf( - DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false", - // Logging influencing whether a transaction is committed to DeltaLog or not - DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED.key -> enableLogging.toString) { - spark.sql(s"VACUUM $table1WithDVs RETAIN 0 HOURS") - } - } - } - - test("CLONE is blocked on table with DVs") { - assertDVTableUpdatesAreDisabled(testTablePath = Some(table2WithDVs)) { tablePath => - spark.sql(s"CREATE TABLE delta.`$tablePath` SHALLOW CLONE $table2WithDVs") - } - } - - test("CREATE TABLE with DVs is blocked") { - assertDVTableUpdatesAreDisabled(testTablePath = None) { tablePath => - withDeletionVectorsEnabled() { - createTempTable(tablePath) - } - } - } - - test("CREATE TABLE with DV feature enabled is blocked") { - assertDVTableUpdatesAreDisabled(testTablePath = None) { tablePath => - withTable("tab") { - spark.sql(s"CREATE TABLE tab (c1 int) USING DELTA " + - "TBLPROPERTIES ('delta.feature.deletionVectors' = 'supported');") - } - } - } - - test("ALTER TABLE to add DV feature is blocked") { - assertDVTableUpdatesAreDisabled(testTablePath = None) { tablePath => - withTable("tab") { - spark.sql("CREATE TABLE tab (c1 int) USING DELTA;") - spark.sql("ALTER TABLE tab SET " + - "TBLPROPERTIES ('delta.feature.deletionVectors' = 'supported');") - } - } - } - - test("Enabling DV feature on a table is blocked") { - assertDVTableUpdatesAreDisabled(testTablePath = None) { tablePath => - createTempTable(tablePath) - enableDeletionVectorsInTable(new Path(tablePath), enable = true) - } - } - - def assertDVTableUpdatesAreDisabled(testTablePath: Option[String])(f: String => Unit): Unit = { - val dataBefore = testTablePath.map(path => spark.sql(s"SELECT * FROM $path")) - val ex = intercept[UnsupportedOperationException] { - withTempPath { path => - f(path.getAbsolutePath) - } - } - assert(ex.isInstanceOf[UnsupportedOperationException]) - val msg = ex.getMessage - assert( - msg.contains("Updates to tables with Deletion Vectors feature enabled are " + - "not supported in this version of Delta Lake.") | - msg.contains("Enabling Deletion Vectors on the table is not supported in this " + - "version of Delta Lake.")) - - val dataAfter = testTablePath.map(path => spark.sql(s"SELECT * FROM $path")) - if (testTablePath.isDefined) { - checkAnswer(dataAfter.get, dataBefore.get) - } - } - - private def createTempTable(path: String): Unit = { - spark.range(end = 100L).toDF("id").coalesce(1) - .write.format("delta").mode("overwrite").save(path) - } - - protected override def beforeAll(): Unit = { - super.beforeAll() - spark.sessionState.conf.setConf( - DeltaSQLConf.DELTA_ENABLE_BLOCKING_UPDATES_ON_DV_TABLES, false) - } - - protected override def afterAll(): Unit = { - spark.sessionState.conf.setConf( - DeltaSQLConf.DELTA_ENABLE_BLOCKING_UPDATES_ON_DV_TABLES, true) - super.afterAll() - } - - private val table2WithDVs = s"delta.`${new File(table2Path).getAbsolutePath}`" - private val table1WithDVs = s"delta.`${new File(table1Path).getAbsolutePath}`" - private val table4WithDVFeatureSupported = s"delta.`${new File(table4Path).getAbsolutePath}`" -} From 3880aa1e7f09d7990b9d8987b3c8f200c556a28a Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Thu, 4 May 2023 12:19:41 -0700 Subject: [PATCH 2/2] review --- .../spark/sql/delta/commands/DeleteCommand.scala | 7 ++----- .../_delta_log/00000000000000000000.json | 4 ---- ...858-4d61-ba30-9549910bc73d-c000.snappy.parquet | Bin 609 -> 0 bytes .../deletionvectors/DeletionVectorsSuite.scala | 4 ---- 4 files changed, 2 insertions(+), 13 deletions(-) delete mode 100644 core/src/test/resources/delta/table-with-dv-feature-enabled/_delta_log/00000000000000000000.json delete mode 100644 core/src/test/resources/delta/table-with-dv-feature-enabled/part-00000-fe8a3447-d858-4d61-ba30-9549910bc73d-c000.snappy.parquet diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index 89df2cd5d95..8fb6d6fcae8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FIN import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues import org.apache.spark.sql.delta.files.TahoeBatchFileIndex import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.Utils import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.SparkContext @@ -440,10 +439,8 @@ case class DeleteCommand( def shouldWritePersistentDeletionVectors( spark: SparkSession, txn: OptimisticTransaction): Boolean = { - // DELETE with DVs only enabled for tests. - Utils.isTesting && - spark.conf.get(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS) && - DeletionVectorUtils.deletionVectorsWritable(txn.snapshot) + spark.conf.get(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS) && + DeletionVectorUtils.deletionVectorsWritable(txn.snapshot) } } diff --git a/core/src/test/resources/delta/table-with-dv-feature-enabled/_delta_log/00000000000000000000.json b/core/src/test/resources/delta/table-with-dv-feature-enabled/_delta_log/00000000000000000000.json deleted file mode 100644 index 5ac362dd2cd..00000000000 --- a/core/src/test/resources/delta/table-with-dv-feature-enabled/_delta_log/00000000000000000000.json +++ /dev/null @@ -1,4 +0,0 @@ -{"commitInfo":{"timestamp":1678403656146,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"609"},"engineInfo":"","txnId":"9ce289f2-84a0-4cd6-918f-139376c1d897"}} -{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} -{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1678403653515}} -{"add":{"path":"part-00000-fe8a3447-d858-4d61-ba30-9549910bc73d-c000.snappy.parquet","partitionValues":{},"size":609,"modificationTime":1678403655890,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"x\":1},\"maxValues\":{\"x\":1},\"nullCount\":{\"x\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1678403655890000","MIN_INSERTION_TIME":"1678403655890000","MAX_INSERTION_TIME":"1678403655890000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/core/src/test/resources/delta/table-with-dv-feature-enabled/part-00000-fe8a3447-d858-4d61-ba30-9549910bc73d-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-feature-enabled/part-00000-fe8a3447-d858-4d61-ba30-9549910bc73d-c000.snappy.parquet deleted file mode 100644 index 71f64fa364151522c3dbc3a3fa3e4c75d9aaa5a0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 609 zcmZWn+iKfD5FKwW2vQ2A*<}TS3PDW*_EwVZBr>I7=tEnI2_@}A+7eds*b&ji(aLFp z@n7^?@(p?H7nP1%H-7LwEOXA>&dl!V;8VbXJKW{JAAa5MIh+yP-DG>w@vYIG8)uciF zm80r3$Mdp4`FxOLl_vVt$I@I3iV0?DKsp-<8BTO^_kMU59Z3KwHv(?3GHF@JosQ}$ zALQXigt^WX?c#cdwW`V=D#u!nbgkv*xmVv!(d&Dru4YN?wY+hP*~obPuXC@^`O?$fNiwU7(mSsB5$Vpk)^8VNJ`t!x&`vT;P4GI6Ic&lqXqXjGnIbSJi=ast}!3|Up zs!n^UEvDUEnYQoyp?eU|(ro00-IK14dZQSFBTak