Skip to content

Commit 207d8d2

Browse files
authored
[Hudi] Fix duplicate record naming in schema conversion (#3310)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Hudi) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR fixes a bug in the Delta->Hudi schema conversion for structs. Previously, any table conversion that included at least one struct somewhere in the schema would fail. This is because the previous code was incorrectly naming the Avro RecordSchemas. There were two problems: 1. There was no namespace added to each record, so for a struct column with the following schema: ` (myName STRUCT<myName: STRUCT<field1: INT>>)` there would be an error due to the duplicate naming of the two structs. (**Avro does not allow recordSchemas with the same name under the same namespace**) It would incorrectly place the nested struct in the same namespace as the parent struct, and even though our schema should be valid the Avro schema creation would fail. 2. For some reason we were naming each record by its data type name instead of its own name. Since we represent the Delta schema as a struct, even if we don't have any nested structs inside our table, as long as we have at least one struct column in the schema we will end up creating a nested struct. Both of these records would be named "struct" and be under the same namespace (due to problem 1), so we would run into a duplication error even if we just have a single struct of ints. So for an example table defined as follows: `CREATE TABLE myTable (col1 STRUCT<field1: INT, field2: STRING>)` the previous code would not work because it would represent our overall schema as a struct with name "struct", and our struct column would be a nested struct with name "struct" under the same namespace. Now, I have changed it so that it works and is compatible with Spark+Hudi. We are now using namespaces and also naming with column names rather than column type names. For this example, our Avro schema would look like this: ``` { "type": "record", "name": "table", "fields": [ { "name": "col1", "type": [ "null", { "type": "record", "name": "col1", "namespace": "table", "fields": [ { "name": "field1", "type": [ "null", "int" ] }, { "name": "field2", "type": [ "null", "string" ] } ] } ] } ] } ``` ## How was this patch tested? Unit test and manually tested with Hudi SparkSession reader. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 87f0685 commit 207d8d2

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,17 @@ object HudiSchemaUtils extends DeltaLogging {
3333
* Recursively (i.e. for all nested elements) transforms the delta DataType `elem` into its
3434
* corresponding Avro type.
3535
*/
36-
def transform[E <: DataType](elem: E, isNullable: Boolean): Schema = elem match {
36+
def transform[E <: DataType](elem: E, isNullable: Boolean, currentPath: String): Schema =
37+
elem match {
3738
case StructType(fields) =>
3839

3940
val avroFields: util.List[Schema.Field] = fields.map(f =>
4041
new Schema.Field(
4142
f.name,
42-
transform(f.dataType, f.nullable),
43+
transform(f.dataType, f.nullable, s"$currentPath.${f.name}"),
4344
f.getComment().orNull)).toList.asJava
4445
finalizeSchema(
45-
Schema.createRecord(elem.typeName, null, null, false, avroFields),
46+
Schema.createRecord(currentPath, null, null, false, avroFields),
4647
isNullable)
4748
// TODO: Add List and Map support: https://github.com/delta-io/delta/issues/2738
4849
case ArrayType(elementType, containsNull) =>
@@ -57,7 +58,7 @@ object HudiSchemaUtils extends DeltaLogging {
5758
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Hudi")
5859
}
5960

60-
transform(deltaSchema, false)
61+
transform(deltaSchema, false, "root")
6162
}
6263

6364
private def finalizeSchema(targetSchema: Schema, isNullable: Boolean): Schema = {

hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
203203
test("validate various data types") {
204204
_sparkSession.sql(
205205
s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
206-
| col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP)
206+
| col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
207+
| col9 STRUCT<field1: INT, field2: STRING>)
207208
| USING DELTA
208209
|LOCATION '$testTablePath'
209210
|TBLPROPERTIES (
@@ -212,7 +213,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
212213
val nowSeconds = Instant.now().getEpochSecond
213214
_sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, "
214215
+ s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', "
215-
+ s"timestamp(from_unixtime($nowSeconds)))")
216+
+ s"timestamp(from_unixtime($nowSeconds)), "
217+
+ s"named_struct('field1', 789, 'field2', 'hello'))")
216218
verifyFilesAndSchemaMatch()
217219
}
218220

0 commit comments

Comments
 (0)