Skip to content

Commit 7eaa6e8

Browse files
abhisheknath2011autumnustsrramach
authored
[GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (apache#3581)
* [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (apache#3349) * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs. * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries * Applied Gobblin codestyle formatting. Co-authored-by: Lei <[email protected]> * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (apache#3368) * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (apache#3458) * Use helper-all v0.2.74 to solve issues around default values. (apache#3469) The latest version of helper-all fixes the issues seen before w.r.t. default values, so we can now revert the code and the *.avsc files back to how they used to be, with two minor exceptions: 1. Check Schema equality using their .toString() representations. Doing it the old way works for two out of the three instances, but one of them fails, for reasons I haven't figured out yet. 2. Add a `"default":null` piece to recursive_schema_1_converted.avsc. This is harmless, and is caused by the fact that the compatibility helper always adds it if it's a valid default for the schema. See the comments for FieldBuilder19.setDefault(): https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69 To verify that the files are otherwise the same as before: ``` $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do > git show 928e018:$file > /tmp/before > diff <(jq . </tmp/before) <(jq . <$file) > done ``` * [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (apache#3349) * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs. * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries * Applied Gobblin codestyle formatting. Co-authored-by: Lei <[email protected]> * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (apache#3368) * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (apache#3458) * Use helper-all v0.2.74 to solve issues around default values. (apache#3469) The latest version of helper-all fixes the issues seen before w.r.t. default values, so we can now revert the code and the *.avsc files back to how they used to be, with two minor exceptions: 1. Check Schema equality using their .toString() representations. Doing it the old way works for two out of the three instances, but one of them fails, for reasons I haven't figured out yet. 2. Add a `"default":null` piece to recursive_schema_1_converted.avsc. This is harmless, and is caused by the fact that the compatibility helper always adds it if it's a valid default for the schema. See the comments for FieldBuilder19.setDefault(): https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69 To verify that the files are otherwise the same as before: ``` $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do > git show 928e018:$file > /tmp/before > diff <(jq . </tmp/before) <(jq . <$file) > done ``` * [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (apache#3349) * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs. * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries * Applied Gobblin codestyle formatting. Co-authored-by: Lei <[email protected]> * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (apache#3368) * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (apache#3458) * Use helper-all v0.2.74 to solve issues around default values. (apache#3469) The latest version of helper-all fixes the issues seen before w.r.t. default values, so we can now revert the code and the *.avsc files back to how they used to be, with two minor exceptions: 1. Check Schema equality using their .toString() representations. Doing it the old way works for two out of the three instances, but one of them fails, for reasons I haven't figured out yet. 2. Add a `"default":null` piece to recursive_schema_1_converted.avsc. This is harmless, and is caused by the fact that the compatibility helper always adds it if it's a valid default for the schema. See the comments for FieldBuilder19.setDefault(): https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69 To verify that the files are otherwise the same as before: ``` $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do > git show 928e018:$file > /tmp/before > diff <(jq . </tmp/before) <(jq . <$file) > done ``` * Merging apache/gobblin master with avro_1_9 * Use helper-all v0.2.74 to solve issues around default values. (apache#3469) The latest version of helper-all fixes the issues seen before w.r.t. default values, so we can now revert the code and the *.avsc files back to how they used to be, with two minor exceptions: 1. Check Schema equality using their .toString() representations. Doing it the old way works for two out of the three instances, but one of them fails, for reasons I haven't figured out yet. 2. Add a `"default":null` piece to recursive_schema_1_converted.avsc. This is harmless, and is caused by the fact that the compatibility helper always adds it if it's a valid default for the schema. See the comments for FieldBuilder19.setDefault(): https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69 To verify that the files are otherwise the same as before: ``` $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do > git show 928e018:$file > /tmp/before > diff <(jq . </tmp/before) <(jq . <$file) > done ``` * Added deprecated json method using AvroCompatibilityHelper * Removed unused import and replaced Integer.valueOf with Integer.parseInt * Exclude com.linkedin.hive dependency from gradle build files similar to org.apache.hive * Repalce direct avro field creation with AvroCompatibilityHelper.createSchemaField * Removed extra dependency. Addressed review comment - removed jcenter() repository * Upgrade AvroCompatHelper version * Removed the code that are actually moved to AvroHiveTypeUtils.java in the master branch * Addresssed review comments: replaced getObjectProps/getObjectProp with AvroCompatibilityHelper methods * Fix for test failure Co-authored-by: Lei <[email protected]> Co-authored-by: Sreeram Ramachandran <[email protected]>
1 parent 0936016 commit 7eaa6e8

File tree

68 files changed

+569
-179
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+569
-179
lines changed

gobblin-cluster/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
compile (externalDependency.helix) {
5151
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
5252
}
53+
compile externalDependency.jacksonMapperAsl
5354

5455
runtimeOnly project(":gobblin-modules:gobblin-service-kafka")
5556

gobblin-compaction/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies {
4646

4747
runtimeOnly(externalDependency.hiveService) {
4848
exclude group: 'org.apache.hive', module: 'hive-exec'
49+
exclude group: 'com.linkedin.hive', module: 'hive-exec'
4950
}
5051
runtimeOnly externalDependency.hiveJdbc
5152
runtimeOnly externalDependency.hiveMetastore

gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.compaction.mapreduce.avro;
1919

20+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
2021
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.List;
@@ -27,6 +28,7 @@
2728
import org.apache.avro.generic.GenericRecord;
2829
import org.apache.hadoop.conf.Configuration;
2930
import org.codehaus.jackson.JsonFactory;
31+
import org.codehaus.jackson.JsonNode;
3032
import org.codehaus.jackson.JsonParser;
3133
import org.codehaus.jackson.map.ObjectMapper;
3234
import org.codehaus.jackson.node.ObjectNode;
@@ -79,7 +81,9 @@ public List<String> getDeltaFieldNames(GenericRecord record) {
7981
private List<String> getDeltaFieldNamesForNewSchema(Schema originalSchema) {
8082
List<String> deltaFields = new ArrayList<>();
8183
for (Field field : originalSchema.getFields()) {
82-
String deltaAttributeField = field.getJsonProp(this.attributeField).getValueAsText();
84+
// Avro 1.9 compatible change - replaced deprecated public api getJsonProp with AvroCompatibilityHelper methods
85+
String deltaAttributeField = AvroCompatibilityHelper.getFieldPropAsJsonString(field, this.attributeField,
86+
true, false);
8387
ObjectNode objectNode = getDeltaPropValue(deltaAttributeField);
8488
if (objectNode == null || objectNode.get(this.deltaPropName) == null) {
8589
continue;
@@ -98,7 +102,8 @@ private ObjectNode getDeltaPropValue(String json) {
98102
JsonParser jp = jf.createJsonParser(json);
99103
ObjectMapper objMap = new ObjectMapper(jf);
100104
jp.setCodec(objMap);
101-
return (ObjectNode) jp.readValueAsTree();
105+
JsonNode jsonNode = jp.readValueAsTree();
106+
return (ObjectNode) objMap.readTree(jsonNode.asText());
102107
} catch (IOException e) {
103108
return null;
104109
}

gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.avro.mapred.AvroKey;
3838
import org.apache.avro.mapred.AvroValue;
3939
import org.apache.avro.mapreduce.AvroJob;
40+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
4041
import org.apache.commons.io.FilenameUtils;
4142
import org.apache.gobblin.compaction.dataset.Dataset;
4243
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
@@ -198,7 +199,8 @@ public static Optional<Schema> getKeySchemaFromRecord(Schema record) {
198199
for (Field field : record.getFields()) {
199200
Optional<Schema> newFieldSchema = getKeySchema(field);
200201
if (newFieldSchema.isPresent()) {
201-
fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(), field.defaultValue()));
202+
fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), newFieldSchema.get(), field.doc(),
203+
AvroUtils.getCompatibleDefaultValue(field)));
202204
}
203205
}
204206
if (!fields.isEmpty()) {

gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.testng.Assert;
2626
import org.testng.annotations.Test;
2727

28-
import avro.shaded.com.google.common.collect.Lists;
28+
import com.google.common.collect.Lists;
2929

3030
import static org.mockito.Mockito.mock;
3131
import static org.mockito.Mockito.when;

gobblin-core-base/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies {
3131
compile externalDependency.avroMapredH2
3232
compile externalDependency.commonsCodec
3333
compile externalDependency.avro
34+
compile externalDependency.avroCompatHelper
3435
compile externalDependency.guava
3536
compile externalDependency.slf4j
3637
compile externalDependency.typesafeConfig

gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222

2323
import org.apache.avro.Schema;
2424
import org.apache.avro.Schema.Field;
25-
import org.codehaus.jackson.JsonNode;
25+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
2626

2727
import com.google.common.base.Preconditions;
2828
import com.google.common.base.Splitter;
2929
import com.google.common.collect.Lists;
3030
import com.google.common.collect.Maps;
3131

32-
import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
32+
import org.apache.gobblin.util.AvroSchemaUtils;
33+
import org.apache.gobblin.util.AvroUtils;
3334

3435

3536
/**
@@ -108,7 +109,7 @@ private Schema removeFields(Schema schema, Map<String, Schema> schemaMap) {
108109
private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
109110

110111
Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
111-
convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
112+
AvroSchemaUtils.copySchemaProperties(schema, newRecord);
112113

113114
// Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
114115
// The fields in the incomplete schema will be populated once the current schema is completely processed.
@@ -119,15 +120,16 @@ private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schema
119120
if (!this.shouldRemove(field)) {
120121
Field newField;
121122
if (this.children.containsKey(field.name())) {
122-
newField = new Field(field.name(), this.children.get(field.name()).removeFields(field.schema(), schemaMap),
123-
field.doc(), field.defaultValue());
123+
newField = AvroCompatibilityHelper.createSchemaField(field.name(),
124+
this.children.get(field.name()).removeFields(field.schema(), schemaMap),
125+
field.doc(), AvroUtils.getCompatibleDefaultValue(field));
124126
} else {
125-
newField = new Field(field.name(), DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
126-
field.defaultValue());
127-
}
128-
for (Map.Entry<String, JsonNode> stringJsonNodeEntry : field.getJsonProps().entrySet()) {
129-
newField.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
127+
newField = AvroCompatibilityHelper.createSchemaField(field.name(),
128+
DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
129+
AvroUtils.getCompatibleDefaultValue(field));
130130
}
131+
// Avro 1.9 compatible change - replaced deprecated public api getJsonProps with AvroCompatibilityHelper methods
132+
AvroSchemaUtils.copyFieldProperties(field, newField);
131133
newFields.add(newField);
132134
}
133135
}

gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.avro.Schema.Field;
2727
import org.apache.avro.generic.GenericData;
2828
import org.apache.avro.generic.GenericRecord;
29+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
2930

3031
import com.google.common.base.Joiner;
3132
import com.google.common.base.Preconditions;
@@ -85,10 +86,11 @@ public Converter init(WorkUnitState workUnitState) {
8586
String curFieldName = field.name();
8687
if (!field.schema().getType().equals(Schema.Type.MAP)) {
8788
if (fieldsRenameMap.containsKey(curFieldName)) {
88-
newFields.add(
89-
new Schema.Field(fieldsRenameMap.get(curFieldName), field.schema(), field.doc(), field.defaultValue()));
89+
newFields.add(AvroCompatibilityHelper.createSchemaField(fieldsRenameMap.get(curFieldName), field.schema(),
90+
field.doc(), AvroUtils.getCompatibleDefaultValue(field)));
9091
} else {
91-
newFields.add(new Schema.Field(curFieldName, field.schema(), field.doc(), field.defaultValue()));
92+
newFields.add(AvroCompatibilityHelper.createSchemaField(curFieldName, field.schema(), field.doc(),
93+
AvroUtils.getCompatibleDefaultValue(field)));
9294
}
9395
this.nonMapFields.add(curFieldName);
9496
} else {
@@ -102,7 +104,7 @@ public Converter init(WorkUnitState workUnitState) {
102104
for (String fieldToFlatten : ConfigUtils.getStringList(config, FIELDS_TO_FLATTEN)) {
103105
String newFieldName =
104106
this.fieldsRenameMap.containsKey(fieldToFlatten) ? this.fieldsRenameMap.get(fieldToFlatten) : fieldToFlatten;
105-
newFields.add(new Field(newFieldName, Schema.create(Schema.Type.STRING), "", null));
107+
newFields.add(AvroCompatibilityHelper.createSchemaField(newFieldName, Schema.create(Schema.Type.STRING), "", null));
106108
}
107109

108110
return this;

gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import com.typesafe.config.Config;
3030
import com.typesafe.config.ConfigFactory;
3131

32-
import avro.shaded.com.google.common.base.Throwables;
32+
import com.google.common.base.Throwables;
3333
import lombok.extern.slf4j.Slf4j;
3434

3535
import org.apache.gobblin.configuration.SourceState;

gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,16 @@ public class AvroSchemaFieldRemoverTest {
3737
public void testRemoveFields() throws IllegalArgumentException, IOException {
3838
Schema convertedSchema1 = convertSchema("/converter/recursive_schema_1.avsc", "YwchQiH.OjuzrLOtmqLW");
3939
Schema expectedSchema1 = parseSchema("/converter/recursive_schema_1_converted.avsc");
40-
Assert.assertEquals(convertedSchema1, expectedSchema1);
40+
Assert.assertEquals(convertedSchema1.toString(), expectedSchema1.toString());
4141

4242
Schema convertedSchema2 =
4343
convertSchema("/converter/recursive_schema_2.avsc", "FBuKC.wIINqII.lvaerUEKxBQUWg,eFQjDj.TzuYZajb");
4444
Schema expectedSchema2 = parseSchema("/converter/recursive_schema_2_converted.avsc");
45-
Assert.assertEquals(convertedSchema2, expectedSchema2);
45+
Assert.assertEquals(convertedSchema2.toString(), expectedSchema2.toString());
4646

4747
Schema convertedSchema3 = convertSchema("/converter/recursive_schema_2.avsc", "field.that.does.not.exist");
4848
Schema expectedSchema3 = parseSchema("/converter/recursive_schema_2_not_converted.avsc");
49-
Assert.assertEquals(convertedSchema3, expectedSchema3);
49+
Assert.assertEquals(convertedSchema3.toString(), expectedSchema3.toString());
5050
}
5151

5252
private Schema parseSchema(String schemaFile) throws IOException {

gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ public void testSchemaConversion()
4545
Schema output = converter.convertSchema(
4646
new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
4747
workUnitState);
48-
Assert.assertEquals(output, new Schema.Parser().parse(
48+
Schema parsedSchema = new Schema.Parser().parse(
4949
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
5050
+ "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at which event was created.\",\"default\":0},"
5151
+ "{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace used for filtering of events.\"},"
5252
+ "{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event name.\"},{\"name\":\"field1\",\"type\":\"string\",\"doc\":\"\"},"
53-
+ "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
53+
+ "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
54+
Assert.assertEquals(output.toString(), parsedSchema.toString());
5455

5556
props.put(GobblinTrackingEventFlattenFilterConverter.class.getSimpleName() + "."
5657
+ GobblinTrackingEventFlattenFilterConverter.FIELDS_RENAME_MAP, "name:eventName,field1:field3");
@@ -61,11 +62,12 @@ public void testSchemaConversion()
6162
Schema output2 = converter.convertSchema(
6263
new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
6364
workUnitState2);
64-
Assert.assertEquals(output2, new Schema.Parser().parse(
65+
parsedSchema = new Schema.Parser().parse(
6566
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
6667
+ "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at which event was created.\",\"default\":0},"
6768
+ "{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace used for filtering of events.\"},"
6869
+ "{\"name\":\"eventName\",\"type\":\"string\",\"doc\":\"Event name.\"},{\"name\":\"field3\",\"type\":\"string\",\"doc\":\"\"},"
69-
+ "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
70+
+ "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
71+
Assert.assertEquals(output2.toString(), parsedSchema.toString());
7072
}
7173
}

0 commit comments

Comments
 (0)