diff --git a/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala b/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala index 9b67283821d..fbf8f9d2483 100644 --- a/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala +++ b/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaIllegalArgumentE import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnaryExpression} @@ -429,7 +430,6 @@ object DeltaMergeInto { } val canAutoMigrate = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE) - /** * Resolves a clause using the given plan (used for resolving the action exprs) and * returns the resolved clause. @@ -516,6 +516,14 @@ object DeltaMergeInto { resolveOrFail(unresolvedAttrib, fakeTargetPlan, s"$typ clause"), resolutionErrorMsg) } catch { + // Allow schema evolution for update and insert non-star when the column is not in + // the target. + case _: AnalysisException + if canAutoMigrate && (clause.isInstanceOf[DeltaMergeIntoMatchedUpdateClause] || + clause.isInstanceOf[DeltaMergeIntoNotMatchedClause]) => + DeltaUpdateTable.getTargetColNameParts( + resolveOrFail(unresolvedAttrib, fakeSourcePlan, s"$typ clause"), + resolutionErrorMsg) case e: Throwable => throw e } } @@ -548,20 +556,55 @@ object DeltaMergeInto { val resolvedNotMatchedBySourceClauses = notMatchedBySourceClauses.map { resolveClause(_, fakeTargetPlan) } - val actions = (matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses) - .flatMap(_.actions) - val containsStarAction = actions.exists(_.isInstanceOf[UnresolvedStar]) - val migrateSchema = canAutoMigrate && containsStarAction + val finalSchema = if (canAutoMigrate) { + // When schema evolution is enabled, add to the target table new columns or nested fields that + // are assigned to in merge actions and not already part of the target schema. This is done by + // collecting all assignments from merge actions and using them to filter out the source + // schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE + // clauses since these can't by definition reference source columns and thus can't introduce + // new columns in the target schema. + val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions) + val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts } + val containsStarAction = actions.exists { + case _: UnresolvedStar => true + case _ => false + } - val finalSchema = if (migrateSchema) { - var sourceSchema = source.schema + // Filter the source schema to retain only fields that are referenced by at least one merge + // clause, then merge this schema with the target to give the final schema. + def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType = + StructType(sourceSchema.flatMap { field => + val fieldPath = basePath :+ field.name.toLowerCase(Locale.ROOT) + val childAssignedInMergeClause = assignments.exists(_.startsWith(fieldPath)) + + field.dataType match { + // Specifically assigned to in one clause: always keep, including all nested attributes + case _ if assignments.contains(fieldPath) => Some(field) + // If this is a struct and one of the child is being assigned to in a merge clause, keep + // it and continue filtering children. + case struct: StructType if childAssignedInMergeClause => + Some(field.copy(dataType = filterSchema(struct, fieldPath))) + // The field isn't assigned to directly or indirectly (i.e. its children) in any non-* + // clause. Check if it should be kept with any * action. + case struct: StructType if containsStarAction => + Some(field.copy(dataType = filterSchema(struct, fieldPath))) + case _ if containsStarAction => Some(field) + // The field and its children are not assigned to in any * or non-* action, drop it. + case _ => None + } + }) + + val migrationSchema = filterSchema(source.schema, Seq.empty) // The implicit conversions flag allows any type to be merged from source to target if Spark // SQL considers the source type implicitly castable to the target. Normally, mergeSchemas // enforces Parquet-level write compatibility, which would mean an INT source can't be merged // into a LONG target. - SchemaMergingUtils.mergeSchemas(target.schema, sourceSchema, allowImplicitConversions = true) + SchemaMergingUtils.mergeSchemas( + target.schema, + migrationSchema, + allowImplicitConversions = true) } else { target.schema } @@ -573,7 +616,7 @@ object DeltaMergeInto { resolvedMatchedClauses, resolvedNotMatchedClauses, resolvedNotMatchedBySourceClauses, - migrateSchema = migrateSchema, + migrateSchema = canAutoMigrate, finalSchema = Some(finalSchema)) // Its possible that pre-resolved expressions (e.g. `sourceDF("key") = targetDF("key")`) have diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala index f850648f850..bdc90f324cb 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala @@ -535,4 +535,13 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase { (3, 31, null) // Not matched by source, updated ).toDF("key", "value", "extra"), expectedWithoutEvolution = Seq((0, 0), (1, 1), (3, 31)).toDF("key", "value")) + + // Migrating new column via WHEN NOT MATCHED BY SOURCE is not allowed. + testEvolution("update new column with not matched by source fails")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra3"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = updateNotMatched("extra = s.extra") :: Nil, + expectErrorContains = "cannot resolve extra in UPDATE clause", + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index 0de59e78a00..0826bbedf82 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -2550,6 +2550,251 @@ abstract class MergeIntoSuiteBase expectedWithoutEvolution = ((0, 0) +: (3, 30) +: (1, 1) +: Nil).toDF("key", "value") ) + // Schema evolution with UPDATE SET alone + testEvolution("new column with update set")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = update(set = "key = s.key, value = s.value, extra = s.extra") :: Nil, + expected = ((0, 0, null) +: (3, 30, null) +: (1, 1, "extra1") +: Nil) + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + testEvolution("new column updated with value from existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = update(set = "extra = s.value") :: Nil, + expected = ((0, 0, null) +: (1, 10, 1) +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, Integer)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Schema evolution with INSERT alone + testEvolution("new column with insert values")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(key, value, extra) VALUES (s.key, s.value, s.extra)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, 2, "extra2") +: Nil) + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + testEvolution("new column inserted with value from existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.value)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, 2) +: Nil) + .asInstanceOf[List[(Integer, Integer, Integer)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + // Schema evolution (UPDATE) with two new columns in the source but only one added to the target. + testEvolution("new column with update set and column not updated")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = update(set = "extra = s.extra") :: Nil, + expected = ((0, 0, null) +: (1, 10, "extra1") +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + testEvolution("new column updated from other new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = update(set = "extra = s.unused") :: Nil, + expected = ((0, 0, null) +: (1, 10, "unused1") +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Schema evolution (INSERT) with two new columns in the source but only one added to the target. + testEvolution("new column with insert values and column not inserted")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.extra)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "extra2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + testEvolution("new column inserted from other new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.unused)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "unused2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + // Schema evolution with two new columns added by UPDATE and INSERT resp. + testEvolution("new column added by insert and other new column added by update")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "other1"), (2, 2, "extra2", "other2")) + .toDF("key", "value", "extra", "other"), + clauses = update(set = "extra = s.extra") :: + insert(values = "(key, other) VALUES (s.key, s.other)") :: Nil, + expected = + ((0, 0, null, null) +: + (1, 10, "extra1", null) +: + (3, 30, null, null) +: + (2, null, null, "other2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String, String)]] + .toDF("key", "value", "extra", "other"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Nested Schema evolution with UPDATE alone + testNestedStructsEvolution("new nested source field added when updating top-level column")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = update("value = s.value") :: Nil, + result = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("new nested source field not in update is ignored")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = update("value.a = s.value.a") :: Nil, + result = """{ "key": "A", "value": { "a": 2 }""", + resultWithoutEvolution = """{ "key": "A", "value": { "a": 2 }""") + + testNestedStructsEvolution("two new nested source fields with update: one added, one ignored")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType)), + clauses = update("value.b = s.value.b") :: Nil, + result = """{ "key": "A", "value": { "a": 1, "b": 3 }""", + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "No such struct field") + + + // Nested Schema evolution with INSERT alone + testNestedStructsEvolution("new nested source field added when inserting top-level column")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "B", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = insert("(value) VALUES (s.value)") :: Nil, + result = + """{ "key": "A", "value": { "a": 1, "b": null } + { "key": "B", "value": { "a": 2, "b": 3 }""".stripMargin, + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("insert new nested source field not supported")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType)), + clauses = insert("(value.b) VALUES (s.value.b)") :: Nil, + expectErrorContains = "Nested field is not supported in the INSERT clause of MERGE operation", + expectErrorWithoutEvolutionContains = "No such struct field") + + // No schema evolution + testEvolution("old column updated from new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = update(set = "value = s.extra") :: Nil, + expected = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value")) + + testEvolution("old column inserted from new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = insert(values = "(key) VALUES (s.extra)") :: Nil, + expected = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value")) + + testEvolution("new column with insert existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(key) VALUES (s.key)") :: Nil, + expected = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value")) + + // Column doesn't exist with UPDATE/INSERT alone. + testEvolution("update set nonexistent column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = update(set = "nonexistent = s.extra") :: Nil, + expectErrorContains = "cannot resolve nonexistent in UPDATE clause", + expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in UPDATE clause") + + testEvolution("insert values nonexistent column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(nonexistent) VALUES (s.extra)") :: Nil, + expectErrorContains = "cannot resolve nonexistent in INSERT clause", + expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in INSERT clause") + testEvolution("new column with update set and update *")( targetData = Seq((0, 0), (1, 10), (2, 20)).toDF("key", "value"), sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),