Skip to content

Commit

Permalink
Disallow overwriteSchema with dynamic partitions overwrite
Browse files Browse the repository at this point in the history
Disallow overwriteSchema when partitionOverwriteMode is set to dynamic.
Otherwise, the table might become corrupted as schemas of newly written partitions would
not match the non-overwritten partitions.

GitOrigin-RevId: 1012793448c1ffed9a3f8bde507d9fe1ee183803
  • Loading branch information
sabir-akhadov authored and allisonport-db committed May 10, 2023
1 parent 6556d6f commit 579a315
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
6 changes: 6 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,12 @@
],
"sqlState" : "0A000"
},
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
"message" : [
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."
],
"sqlState" : "42613"
},
"DELTA_PARTITION_COLUMN_CAST_FAILED" : {
"message" : [
"Failed to cast value `<value>` to `<dataType>` for partition column `<columnName>`"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2403,6 +2403,12 @@ trait DeltaErrorsBase
)
}

def overwriteSchemaUsedWithDynamicPartitionOverwrite(): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE"
)
}

def replaceWhereUsedInOverwrite(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_REPLACE_WHERE_IN_OVERWRITE", messageParameters = Array.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ case class WriteIntoDelta(
} else options.isDynamicPartitionOverwriteMode
}

if (useDynamicPartitionOverwriteMode && canOverwriteSchema) {
throw DeltaErrors.overwriteSchemaUsedWithDynamicPartitionOverwrite()
}

// Validate partition predicates
var containsDataFilters = false
val replaceWhere = options.replaceWhere.flatMap { replace =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package org.apache.spark.sql.delta
import java.util.Locale

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaOptions.PARTITION_OVERWRITE_MODE_OPTION
import org.apache.spark.sql.delta.DeltaOptions.{OVERWRITE_SCHEMA_OPTION, PARTITION_OVERWRITE_MODE_OPTION}
import org.apache.spark.sql.delta.actions.{Action, FileAction}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.commons.io.FileUtils
import org.apache.parquet.format.CompressionCodec
Expand Down Expand Up @@ -283,4 +282,23 @@ class DeltaOptionSuite extends QueryTest
}
}
}

test("overwriteSchema=true should be invalid with partitionOverwriteMode=dynamic") {
withTempDir { tempDir =>
val e = intercept[DeltaIllegalArgumentException] {
withSQLConf(DeltaSQLConf.DYNAMIC_PARTITION_OVERWRITE_ENABLED.key -> "true") {
Seq(1, 2, 3).toDF
.withColumn("part", $"value" % 2)
.write
.mode("overwrite")
.format("delta")
.partitionBy("part")
.option(OVERWRITE_SCHEMA_OPTION, "true")
.option(PARTITION_OVERWRITE_MODE_OPTION, "dynamic")
.save(tempDir.getAbsolutePath)
}
}
assert(e.getErrorClass == "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE")
}
}
}

0 comments on commit 579a315

Please sign in to comment.