Skip to content

Commit

Permalink
ISSUE-532 ISSUE-533 Fix handling of self referencing schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Yang committed Feb 21, 2019
1 parent 27a78de commit 5f89f77
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
import com.hortonworks.registries.schemaregistry.errors.CyclicSchemaDependencyException;
import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.codehaus.jackson.node.NullNode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.avro.Schema.Type.RECORD;

Expand Down Expand Up @@ -132,21 +132,19 @@ private String getResultantSchema(String schemaText, Map<String, SchemaParsingSt
Schema.Parser parser = new Schema.Parser();
parser.addTypes(complexTypes);
Schema schema = parser.parse(schemaText);
Set<String> visitingTypes = new HashSet<>();
Map<String, Schema> visitingTypes = new HashMap<>();
Schema updatedSchema = handleUnionFieldsWithNull(schema, visitingTypes);

return (schema == updatedSchema && complexTypes.isEmpty()) ? schemaText : updatedSchema.toString();
}

public Schema handleUnionFieldsWithNull(Schema schema, Set<String> visitingTypes) {
if (visitingTypes.contains(schema.getFullName())) {
public Schema handleUnionFieldsWithNull(Schema schema, Map<String, Schema> visitingTypes) {
if (visitingTypes.containsKey(schema.getFullName())) {
return schema;
}
visitingTypes.add(schema.getFullName());
visitingTypes.put(schema.getFullName(), schema);

Schema updatedRootSchema = schema;
if (schema.getType() == RECORD) {
List<Schema.Field> fields = updatedRootSchema.getFields();
List<Schema.Field> fields = schema.getFields();
List<Schema.Field> updatedFields = new ArrayList<>(fields.size());
boolean hasUnionType = false;

Expand All @@ -155,43 +153,68 @@ public Schema handleUnionFieldsWithNull(Schema schema, Set<String> visitingTypes
// check for union

boolean currentFieldTypeIsUnion = fieldSchema.getType() == Schema.Type.UNION;
if (currentFieldTypeIsUnion) {
// check for the fields with in union
// if it is union and first type is null then set default value as null
if (fieldSchema.getTypes().get(0).getType() == Schema.Type.NULL) {
hasUnionType = true;
}
Schema.Field rebuiltField;
// check for the fields with in union
// if it is union and first type is null then set default value as null
if (currentFieldTypeIsUnion && fieldSchema.getTypes().get(0).getType() == Schema.Type.NULL) {
hasUnionType = true;
visitingTypes.put(schema.getFullName(),
Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()));
rebuiltField = new Schema.Field(field.name(),
cloneUnion(fieldSchema, visitingTypes),
field.doc(),
field.defaultVal() == null ? JsonProperties.NULL_VALUE : field.defaultVal(),
field.order());
} else {
// go through non-union fields, which may be records
Schema updatedFieldSchema = handleUnionFieldsWithNull(fieldSchema, visitingTypes);
if (fieldSchema != updatedFieldSchema) {
hasUnionType = true;
visitingTypes.put(schema.getFullName(),
Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()));
rebuiltField = new Schema.Field(field.name(),
updatedFieldSchema,
field.doc(),
field.defaultVal(),
field.order());
} else {
rebuiltField = new Schema.Field(field.name(),
visitingTypes.getOrDefault(fieldSchema.getFullName(), fieldSchema),
field.doc(),
field.defaultVal(),
field.order());
}
}
Schema.Field rebuiltField = new Schema.Field(field.name(),
fieldSchema,
field.doc(),
currentFieldTypeIsUnion ? NullNode.getInstance() : field.defaultValue(),
field.order());
for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
rebuiltField.addProp(prop.getKey(), prop.getValue());
}
updatedFields.add(rebuiltField);
}

if (hasUnionType) {
updatedRootSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
Schema updatedRootSchema = visitingTypes.get(schema.getFullName());
updatedRootSchema.setFields(updatedFields);
for (String alias : schema.getAliases()) {
updatedRootSchema.addAlias(alias);
}
for (Map.Entry<String, org.codehaus.jackson.JsonNode> nodeEntry : schema.getJsonProps().entrySet()) {
for (Map.Entry<String, Object> nodeEntry : schema.getObjectProps().entrySet()) {
updatedRootSchema.addProp(nodeEntry.getKey(), nodeEntry.getValue());
}
return updatedRootSchema;
}
}

return updatedRootSchema;
return schema;
}

private Schema cloneUnion(Schema fieldSchema, Map<String, Schema> visitingTypes) {
SchemaBuilder.UnionAccumulator<Schema> unionAccumulator = SchemaBuilder.unionOf().nullType();
for (int i = 1; i < fieldSchema.getTypes().size(); i++) {
Schema innerSchema = fieldSchema.getTypes().get(i);
unionAccumulator = unionAccumulator.and()
.type(visitingTypes.getOrDefault(innerSchema.getFullName(), innerSchema));
}
return unionAccumulator.endUnion();
}

private Schema updateUnionFields(Schema schema) {
Expand Down Expand Up @@ -275,6 +298,9 @@ private Map<String, Schema> collectSchemaTypes(SchemaVersionKey schemaVersionKey
}

private void collectComplexTypes(Schema schema, Map<String, Schema> complexTypes) {
if (complexTypes.containsKey(schema.getFullName())) {
return;
}
switch (schema.getType()) {
case RECORD:
complexTypes.put(schema.getFullName(), schema);
Expand All @@ -285,18 +311,15 @@ private void collectComplexTypes(Schema schema, Map<String, Schema> complexTypes
}
break;
case ARRAY:
complexTypes.put(schema.getFullName(), schema);
collectComplexTypes(schema.getElementType(), complexTypes);
break;
case UNION:
complexTypes.put(schema.getFullName(), schema);
List<Schema> unionSchemas = schema.getTypes();
for (Schema schemaEntry : unionSchemas) {
collectComplexTypes(schemaEntry, complexTypes);
}
break;
case MAP:
complexTypes.put(schema.getFullName(), schema);
collectComplexTypes(schema.getValueType(), complexTypes);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void testComposites() throws Exception {
versions.put("account", cretaeSchemaVersionInfo("/avro/composites/account.avsc"));
versions.put("account-cyclic", cretaeSchemaVersionInfo("/avro/composites/account-cyclic.avsc"));
versions.put("account-dep", cretaeSchemaVersionInfo("/avro/composites/account-dep.avsc"));
versions.put("node", cretaeSchemaVersionInfo("/avro/composites/node.avsc"));

avroSchemaProvider = new AvroSchemaProvider();
SchemaVersionRetriever schemaVersionRetriever = new SchemaVersionRetriever() {
Expand Down Expand Up @@ -94,6 +95,15 @@ public void testCyclicSchema() throws Exception {
String accountSchemaText = avroSchemaProvider.getResultantSchema(getResourceText("/avro/composites/account-ref-cyclic.avsc"));
}

@Test
public void testSelfRefAndUnions() throws Exception {
String linkedListSchemaText = avroSchemaProvider.getResultantSchema(getResourceText("/avro/composites/linked-list.avsc"));
LOG.info("linkedListSchemaText [{}] ", linkedListSchemaText);
Schema.Parser parser = new Schema.Parser();
Schema parsedSchema = parser.parse(linkedListSchemaText);
LOG.info("parsedSchema [{}] ", parsedSchema);
}

@Test
public void testUnionSchemas() throws Exception {
String givenSchemaLocation = "/avro/composites/unions.avsc";
Expand All @@ -107,7 +117,7 @@ private void doTestSchemaResolution(String givenSchemaLocation, String expectedS
Schema schema = new Schema.Parser().parse(getResourceText(givenSchemaLocation));
LOG.info("schema = %s", schema);

Schema effectiveSchema = avroSchemaResolver.handleUnionFieldsWithNull(schema, new HashSet<>());
Schema effectiveSchema = avroSchemaResolver.handleUnionFieldsWithNull(schema, new HashMap<>());
LOG.info("effectiveSchema = %s", effectiveSchema);
String returnedSchemaText = effectiveSchema.toString();
Assert.assertEquals(getResourceText(expectedSchemaLocation).replace(" ", ""),
Expand All @@ -129,10 +139,10 @@ public void testSchemasWithDefaults() throws Exception {
public void testUnionSchemasPropRetention() throws Exception {
AvroSchemaResolver avroSchemaResolver = new AvroSchemaResolver(null);
Schema schema = new Schema.Parser().parse(getResourceText("/avro/composites/unions-with-props.avsc"));
LOG.info("schema = %s", schema);
LOG.info("schema = {}", schema);

Schema effectiveSchema = avroSchemaResolver.handleUnionFieldsWithNull(schema, new HashSet<>());
LOG.info("effectiveSchema = %s", effectiveSchema);
Schema effectiveSchema = avroSchemaResolver.handleUnionFieldsWithNull(schema, new HashMap<>());
LOG.info("effectiveSchema = {}", effectiveSchema);
String returnedSchemaText = effectiveSchema.toString();
Assert.assertEquals("foo", effectiveSchema.getField("name").getProp("someProp"));
Assert.assertEquals("bar", effectiveSchema.getField("address").getProp("otherProp"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"type":"record","name":"unions","namespace":"com.hortonworks.types","fields":[{"name":"name","type":"string"},{"name":"address","type":{"type":"record","name":"addressType","fields":[{"name":"houseNo","type":"string"},{"name":"street","type":"string"},{"name":"pincode","type":["null","long"]},{"name":"subAddress","type":"addressType"}]}},{"name":"secondaryAddress","type":["null","addressType"],"default":null}]}
{"type":"record","name":"unions","namespace":"com.hortonworks.types","fields":[{"name":"name","type":"string"},{"name":"address","type":{"type":"record","name":"addressType","fields":[{"name":"houseNo","type":"string"},{"name":"street","type":"string"},{"name":"pincode","type":["null","long"],"default": null},{"name":"subAddress","type":"addressType"}]}},{"name":"secondaryAddress","type":["null","addressType"],"default":null}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "linked_list",
"namespace": "com.hortonworks.types",
"includeSchemas": [
{
"name": "node"
}
],
"type": "record",
"fields": [
{
"name": "list",
"type": "com.hortonworks.types.node"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "node",
"namespace": "com.hortonworks.types",
"type": "record",
"fields": [
{
"name": "data",
"type": [
"null",
"long"
]
},
{
"name": "next",
"type": [
"null",
"com.hortonworks.types.node"
]
}
]
}

0 comments on commit 5f89f77

Please sign in to comment.