Skip to content

Commit 26fd244

Browse files
authored
[GOBBLIN-1749] Add dependency for handling xz-compressed Avro file (apache#3609)
* Add dependency on xz for handling xz-compressed Avro files * Fix unit test to ensure all codecs are correctly supported * Update AvroHdfsDataWriter's document for covering all compression codecs
1 parent 7b6c5fe commit 26fd244

File tree

4 files changed

+25
-16
lines changed

4 files changed

+25
-16
lines changed

gobblin-core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ dependencies {
5757
compile externalDependency.oltu
5858
compile externalDependency.opencsv
5959
compile externalDependency.hadoopHdfs
60+
compile externalDependency.xz
6061
runtimeOnly externalDependency.protobuf
6162

6263
testRuntime externalDependency.hadoopAws

gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.testng.Assert;
3333
import org.testng.annotations.AfterClass;
3434
import org.testng.annotations.BeforeClass;
35+
import org.testng.annotations.DataProvider;
3536
import org.testng.annotations.Test;
3637

3738
import com.google.gson.Gson;
@@ -54,7 +55,6 @@ public class AvroHdfsDataWriterTest {
5455
private static final Type FIELD_ENTRY_TYPE = new TypeToken<Map<String, Object>>() {}.getType();
5556

5657
private Schema schema;
57-
private DataWriter<GenericRecord> writer;
5858
private String filePath;
5959
private State properties;
6060

@@ -83,24 +83,31 @@ public void setUp() throws Exception {
8383
properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
8484
properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
8585
properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.TEST_FILE_NAME);
86+
}
8687

87-
// Build a writer to write test records
88-
this.writer = new AvroDataWriterBuilder().writeTo(Destination.of(Destination.DestinationType.HDFS, properties))
89-
.writeInFormat(WriterOutputFormat.AVRO).withWriterId(TestConstants.TEST_WRITER_ID).withSchema(this.schema)
90-
.withBranches(1).forBranch(0).build();
88+
@DataProvider(name = "codecs")
89+
private String[] codecs() {
90+
return new String[]{"null", "deflate", "snappy", "bzip2", "xz", "zstandard"};
9191
}
9292

93-
@Test
94-
public void testWrite() throws IOException {
93+
@Test(dataProvider = "codecs")
94+
public void testWrite(String codec) throws IOException {
95+
properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, codec);
96+
DataWriterBuilder<Schema, GenericRecord> builder = new AvroDataWriterBuilder()
97+
.writeTo(Destination.of(Destination.DestinationType.HDFS, properties))
98+
.writeInFormat(WriterOutputFormat.AVRO).withWriterId(TestConstants.TEST_WRITER_ID)
99+
.withSchema(this.schema).withBranches(1).forBranch(0);
100+
DataWriter<GenericRecord> writer = builder.build();
101+
95102
// Write all test records
96103
for (String record : TestConstants.JSON_RECORDS) {
97-
this.writer.write(convertRecord(record));
104+
writer.write(convertRecord(record));
98105
}
99106

100-
Assert.assertEquals(this.writer.recordsWritten(), 3);
107+
Assert.assertEquals(writer.recordsWritten(), 3);
101108

102-
this.writer.close();
103-
this.writer.commit();
109+
writer.close();
110+
writer.commit();
104111

105112
File outputFile =
106113
new File(TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath, TestConstants.TEST_FILE_NAME);

gobblin-docs/sinks/AvroHdfsDataWriter.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ For more info, see [`AvroHdfsDataWriter`](https://github.com/apache/gobblin/sear
1515
# Configuration
1616

1717

18-
| Key | Type | Description | Default Value |
19-
|-----|------|-------------|---------------|
20-
| writer.codec.type | One of null,deflate,snappy,bzip2,xz | Type of the compression codec | deflate |
21-
| writer.deflate.level | 1-9 | The compression level for the "deflate" codec | 9 |
18+
| Key | Type | Description | Default Value |
19+
|----------------------|-----------------------------------------------|-----------------------------------------------|---------------|
20+
| writer.codec.type | One of null,deflate,snappy,bzip2,xz,zstandard | Type of the compression codec | deflate |
21+
| writer.deflate.level | 1-9 | The compression level for the "deflate" codec | 9 |
2222

gradle/scripts/dependencyDefinitions.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ ext.externalDependency = [
210210
],
211211
"postgresConnector": "org.postgresql:postgresql:42.1.4",
212212
"testContainers": "org.testcontainers:testcontainers:1.17.3",
213-
"testContainersMysql": "org.testcontainers:mysql:1.17.3"
213+
"testContainersMysql": "org.testcontainers:mysql:1.17.3",
214+
"xz": "org.tukaani:xz:1.8"
214215
]
215216

216217
if (!isDefaultEnvironment)

0 commit comments

Comments
 (0)