Skip to content

Commit ab237d3

Browse files
committed
Manifest generation blocking when DVs are present
GitOrigin-RevId: 95d4ea5d36e55315ae07795021a0473850bd4f56
1 parent 553476d commit ab237d3

File tree

9 files changed

+328
-9
lines changed

9 files changed

+328
-9
lines changed

core/src/main/resources/error/delta-error-classes.json

+31
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,13 @@
18761876
],
18771877
"sqlState" : "0A000"
18781878
},
1879+
"DELTA_UNSUPPORTED_GENERATE_WITH_DELETION_VECTORS" : {
1880+
"message" : [
1881+
"The 'GENERATE symlink_format_manifest' command is not supported on table versions with deletion vectors.",
1882+
"If you need to generate manifests, consider disabling deletion vectors on this table using 'ALTER TABLE table SET TBLPROPERTIES (delta.enableDeletionVectors = false)'."
1883+
],
1884+
"sqlState" : "0A000"
1885+
},
18791886
"DELTA_UNSUPPORTED_INVARIANT_NON_STRUCT" : {
18801887
"message" : [
18811888
"Invariants on nested fields other than StructTypes are not supported."
@@ -2009,6 +2016,30 @@
20092016
],
20102017
"sqlState" : "23001"
20112018
},
2019+
"DELTA_VIOLATE_TABLE_PROPERTY_VALIDATION_FAILED" : {
2020+
"message" : [
2021+
"The validation of the properties of table <table> has been violated:"
2022+
],
2023+
"subClass" : {
2024+
"EXISTING_DELETION_VECTORS_WITH_INCREMENTAL_MANIFEST_GENERATION" : {
2025+
"message" : [
2026+
"Symlink manifest generation is unsupported while deletion vectors are present in the table.",
2027+
"In order to produce a version of the table without deletion vectors, run 'REORG TABLE <table> APPLY (PURGE)'."
2028+
]
2029+
},
2030+
"PERSISTENT_DELETION_VECTORS_IN_NON_PARQUET_TABLE" : {
2031+
"message" : [
2032+
"Persistent deletion vectors are only supported on Parquet-based Delta tables."
2033+
]
2034+
},
2035+
"PERSISTENT_DELETION_VECTORS_WITH_INCREMENTAL_MANIFEST_GENERATION" : {
2036+
"message" : [
2037+
"Persistent deletion vectors and incremental symlink manifest generation are mutually exclusive."
2038+
]
2039+
}
2040+
},
2041+
"sqlState" : "0A000"
2042+
},
20122043
"DELTA_ZORDERING_COLUMN_DOES_NOT_EXIST" : {
20132044
"message" : [
20142045
"Z-Ordering column <columnName> does not exist in data schema."

core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

+47
Original file line numberDiff line numberDiff line change
@@ -2644,6 +2644,10 @@ trait DeltaErrorsBase
26442644
messageParameters = Array.empty,
26452645
pos = 0)
26462646
}
2647+
2648+
def generateNotSupportedWithDeletionVectors(): Throwable =
2649+
new DeltaCommandUnsupportedWithDeletionVectorsException(
2650+
errorClass = "DELTA_UNSUPPORTED_GENERATE_WITH_DELETION_VECTORS")
26472651
}
26482652

26492653
object DeltaErrors extends DeltaErrorsBase
@@ -2923,6 +2927,49 @@ class DeltaNoSuchTableException(
29232927
override def getErrorClass: String = errorClass
29242928
}
29252929

2930+
class DeltaCommandUnsupportedWithDeletionVectorsException(
2931+
errorClass: String,
2932+
messageParameters: Array[String] = Array.empty)
2933+
extends UnsupportedOperationException(
2934+
DeltaThrowableHelper.getMessage(errorClass, messageParameters))
2935+
with DeltaThrowable {
2936+
override def getErrorClass: String = errorClass
2937+
}
2938+
2939+
sealed trait DeltaTablePropertyValidationFailedSubClass {
2940+
def tag: String
2941+
/** Can be overridden in case subclasses need the table name as well. */
2942+
def messageParameters(table: String): Array[String] = Array(table)
2943+
}
2944+
2945+
final object DeltaTablePropertyValidationFailedSubClass {
2946+
final case object PersistentDeletionVectorsWithIncrementalManifestGeneration
2947+
extends DeltaTablePropertyValidationFailedSubClass {
2948+
override val tag = "PERSISTENT_DELETION_VECTORS_WITH_INCREMENTAL_MANIFEST_GENERATION"
2949+
}
2950+
final case object ExistingDeletionVectorsWithIncrementalManifestGeneration
2951+
extends DeltaTablePropertyValidationFailedSubClass {
2952+
override val tag = "EXISTING_DELETION_VECTORS_WITH_INCREMENTAL_MANIFEST_GENERATION"
2953+
/** This subclass needs the table parameters in two places. */
2954+
override def messageParameters(table: String): Array[String] = Array(table, table)
2955+
}
2956+
final case object PersistentDeletionVectorsInNonParquetTable
2957+
extends DeltaTablePropertyValidationFailedSubClass {
2958+
override val tag = "PERSISTENT_DELETION_VECTORS_IN_NON_PARQUET_TABLE"
2959+
}
2960+
}
2961+
2962+
class DeltaTablePropertyValidationFailedException(
2963+
table: String,
2964+
subClass: DeltaTablePropertyValidationFailedSubClass)
2965+
extends RuntimeException(DeltaThrowableHelper.getMessage(
2966+
errorClass = "DELTA_VIOLATE_TABLE_PROPERTY_VALIDATION_FAILED",
2967+
messageParameters = subClass.messageParameters(table)))
2968+
with DeltaThrowable {
2969+
override def getErrorClass: String =
2970+
"DELTA_VIOLATE_TABLE_PROPERTY_VALIDATION_FAILED." + subClass.tag
2971+
}
2972+
29262973
/** Errors thrown around column mapping. */
29272974
class ColumnMappingUnsupportedException(msg: String)
29282975
extends UnsupportedOperationException(msg)

core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

+3
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
554554
}
555555
}
556556

557+
if (spark.conf.get(DeltaSQLConf.DELTA_TABLE_PROPERTY_CONSTRAINTS_CHECK_ENABLED)) {
558+
Protocol.assertTablePropertyConstraintsSatisfied(spark, metadata, snapshot)
559+
}
557560

558561
val needsProtocolUpdate = Protocol.checkProtocolRequirements(spark, metadata, protocol)
559562
if (needsProtocolUpdate.isDefined) {

core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala

+44-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable
2626
import scala.util.control.NonFatal
2727

2828
import org.apache.spark.sql.delta._
29-
import org.apache.spark.sql.delta.constraints.Constraints
29+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
3030
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3131
import org.apache.spark.sql.delta.util.JsonUtils
3232
import com.fasterxml.jackson.annotation._
@@ -36,11 +36,10 @@ import com.fasterxml.jackson.databind._
3636
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
3737

3838
import org.apache.spark.internal.Logging
39-
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
39+
import org.apache.spark.sql.{Column, Encoder, SparkSession}
4040
import org.apache.spark.sql.catalyst.ScalaReflection
4141
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
4242
import org.apache.spark.sql.catalyst.expressions.Literal
43-
import org.apache.spark.sql.internal.SQLConf
4443
import org.apache.spark.sql.types.{DataType, StructField, StructType}
4544
import org.apache.spark.util.Utils
4645

@@ -339,6 +338,46 @@ object Protocol {
339338
}
340339
}
341340

341+
/**
342+
* Verify that the table properties satisfy legality constraints. Throw an exception if not.
343+
*/
344+
def assertTablePropertyConstraintsSatisfied(
345+
spark: SparkSession,
346+
metadata: Metadata,
347+
snapshot: Snapshot): Unit = {
348+
import DeltaTablePropertyValidationFailedSubClass._
349+
350+
val tableName = if (metadata.name != null) metadata.name else metadata.id
351+
352+
val configs = metadata.configuration.map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v }
353+
val dvsEnabled = {
354+
val lowerCaseKey = DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key.toLowerCase(Locale.ROOT)
355+
configs.get(lowerCaseKey).exists(_.toBoolean)
356+
}
357+
if (dvsEnabled && metadata.format.provider != "parquet") {
358+
// DVs only work with parquet-based delta tables.
359+
throw new DeltaTablePropertyValidationFailedException(
360+
table = tableName,
361+
subClass = PersistentDeletionVectorsInNonParquetTable)
362+
}
363+
val manifestGenerationEnabled = {
364+
val lowerCaseKey = DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.key.toLowerCase(Locale.ROOT)
365+
configs.get(lowerCaseKey).exists(_.toBoolean)
366+
}
367+
if (dvsEnabled && manifestGenerationEnabled) {
368+
throw new DeltaTablePropertyValidationFailedException(
369+
table = tableName,
370+
subClass = PersistentDeletionVectorsWithIncrementalManifestGeneration)
371+
}
372+
if (manifestGenerationEnabled) {
373+
// Only allow enabling this, if there are no DVs present.
374+
if (!DeletionVectorUtils.isTableDVFree(spark, snapshot)) {
375+
throw new DeltaTablePropertyValidationFailedException(
376+
table = tableName,
377+
subClass = ExistingDeletionVectorsWithIncrementalManifestGeneration)
378+
}
379+
}
380+
}
342381
}
343382

344383
/**
@@ -406,7 +445,8 @@ case class AddFile(
406445
// scalastyle:off
407446
val removedFile = RemoveFile(
408447
path, Some(timestamp), dataChange,
409-
extendedFileMetadata = Some(true), partitionValues, Some(size), newTags
448+
extendedFileMetadata = Some(true), partitionValues, Some(size), newTags,
449+
deletionVector = deletionVector
410450
)
411451
// scalastyle:on
412452
removedFile

core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,13 @@ class OptimizeExecutor(
306306
bins += currentBin.toVector
307307
}
308308

309+
bins.filter { bin =>
310+
bin.size > 1 || // bin has more than one file or
311+
(bin.size == 1 && bin(0).deletionVector != null) || // single file in the bin has a DV or
312+
isMultiDimClustering // multi-clustering
313+
}
314+
309315
bins.map(b => (partition, b))
310-
// select bins that have at least two files or in case of multi-dim clustering
311-
// select all bins
312-
.filter(_._2.size > 1 || isMultiDimClustering)
313316
}
314317
}
315318

core/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.net.URI
2121

2222
import org.apache.spark.sql.delta._
2323
import org.apache.spark.sql.delta.actions._
24+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.isTableDVFree
2425
import org.apache.spark.sql.delta.metering.DeltaLogging
2526
import org.apache.spark.sql.delta.storage.LogStore
2627
import org.apache.spark.sql.delta.util.DeltaFileOperations
@@ -78,6 +79,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
7879
override def handleError(error: Throwable, version: Long): Unit = {
7980
error match {
8081
case e: ColumnMappingUnsupportedException => throw e
82+
case e: DeltaCommandUnsupportedWithDeletionVectorsException => throw e
8183
case _ =>
8284
throw DeltaErrors.postCommitHookFailedException(this, version, name, error)
8385
}
@@ -178,6 +180,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
178180
spark: SparkSession,
179181
deltaLog: DeltaLog): Unit = {
180182
val snapshot = deltaLog.update(stalenessAcceptable = false)
183+
assertTableIsDVFree(spark, snapshot)
181184
generateFullManifestWithSnapshot(spark, deltaLog, snapshot)
182185
}
183186

@@ -241,6 +244,12 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
241244
recordDeltaEvent(deltaLog, s"$FULL_MANIFEST_OP_TYPE.stats", data = stats)
242245
}
243246

247+
protected def assertTableIsDVFree(spark: SparkSession, snapshot: Snapshot): Unit = {
248+
if (!isTableDVFree(spark, snapshot)) {
249+
throw DeltaErrors.generateNotSupportedWithDeletionVectors()
250+
}
251+
}
252+
244253
/**
245254
* Write the manifest files and return the partition relative paths of the manifests written.
246255
*

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+9
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,15 @@ trait DeltaSQLConfBase {
10911091
.checkValue(_ >= 0, "maxDeletedRowsRatio must be in range [0.0, 1.0]")
10921092
.checkValue(_ <= 1, "maxDeletedRowsRatio must be in range [0.0, 1.0]")
10931093
.createWithDefault(0.05d)
1094+
1095+
val DELTA_TABLE_PROPERTY_CONSTRAINTS_CHECK_ENABLED =
1096+
buildConf("tablePropertyConstraintsCheck.enabled")
1097+
.internal()
1098+
.doc(
1099+
"""Check that all table-properties satisfy validity constraints.
1100+
|Only change this for testing!""".stripMargin)
1101+
.booleanConf
1102+
.createWithDefault(true)
10941103
}
10951104

10961105
object DeltaSQLConf extends DeltaSQLConfBase

core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala

+7
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
104104
}
105105
}
106106

107+
/** Enable persistent Deletion Vectors in a Delta table. */
108+
def enableDeletionVectorsInTable(tablePath: Path, enable: Boolean): Unit =
109+
spark.sql(
110+
s"""ALTER TABLE delta.`$tablePath`
111+
|SET TBLPROPERTIES ('${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = '$enable')
112+
|""".stripMargin)
113+
107114
// ======== HELPER METHODS TO WRITE DVs ==========
108115
/** Helper method to remove the specified rows in the given file using DVs */
109116
protected def removeRowsFromFileUsingDV(

0 commit comments

Comments
 (0)