Skip to content

Commit 8b89075

Browse files
authored
[GOBBLIN-1957] GobblinOrcwriter improvements for large records (apache#3828)
* WIP * Optimization to limit batchsize based on large record sizes * Address review
1 parent 70e480f commit 8b89075

File tree

3 files changed

+73
-12
lines changed

3 files changed

+73
-12
lines changed

gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
9191
private int orcFileWriterRowsBetweenCheck;
9292
private long orcStripeSize;
9393
private int maxOrcBatchSize;
94+
private int batchSizeRowCheckFactor;
95+
private boolean enableLimitBufferSizeOrcStripe;
9496

9597
private int concurrentWriterTasks;
9698
private long orcWriterStripeSizeBytes;
@@ -109,6 +111,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
109111
this.typeDescription = getOrcSchema();
110112
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
111113
this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false);
114+
this.batchSizeRowCheckFactor = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR);
112115
this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
113116
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
114117
this.batchSize = this.selfTuningWriter ?
@@ -133,6 +136,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
133136
GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
134137
this.orcFileWriterMaxRowsBetweenCheck = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
135138
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
139+
this.enableLimitBufferSizeOrcStripe = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, false);
136140
// Create file-writer
137141
this.writerConfig = new Configuration();
138142
// Populate job Configurations into Conf as well so that configurations related to ORC writer can be tuned easily.
@@ -312,11 +316,18 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
312316
this.currentOrcWriterMaxUnderlyingMemory = Math.max(this.currentOrcWriterMaxUnderlyingMemory, orcFileWriter.estimateMemory());
313317
}
314318
long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory, prevOrcWriterMaxUnderlyingMemory);
315-
316319
int newBatchSize = (int) ((this.availableMemory*1.0 / currentPartitionedWriters * this.rowBatchMemoryUsageFactor - maxMemoryInFileWriter
317320
- this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);
321+
322+
if (this.enableLimitBufferSizeOrcStripe) {
323+
// For large records, prevent the batch size from greatly exceeding the size of a stripe as the native ORC Writer will flush its buffer after a stripe is filled
324+
int maxRecordsPerStripeSize = (int) Math.round(this.orcWriterStripeSizeBytes * 1.0 / averageSizePerRecord);
325+
this.orcFileWriterMaxRowsBetweenCheck = Math.min(this.orcFileWriterMaxRowsBetweenCheck, maxRecordsPerStripeSize);
326+
this.maxOrcBatchSize = Math.min(this.maxOrcBatchSize, maxRecordsPerStripeSize);
327+
}
318328
// Handle scenarios where new batch size can be 0 or less due to overestimating memory used by other components
319329
newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);
330+
320331
if (Math.abs(newBatchSize - this.batchSize) > GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) {
321332
// Add a factor when tuning up the batch size to prevent large sudden increases in memory usage
322333
if (newBatchSize > this.batchSize) {
@@ -337,7 +348,7 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
337348
void initializeOrcFileWriter() {
338349
try {
339350
this.orcFileWriterRowsBetweenCheck = Math.max(
340-
Math.min(this.batchSize * GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, this.orcFileWriterMaxRowsBetweenCheck),
351+
Math.min(this.batchSize * this.batchSizeRowCheckFactor, this.orcFileWriterMaxRowsBetweenCheck),
341352
this.orcFileWriterMinRowsBetweenCheck
342353
);
343354
this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), String.valueOf(this.orcFileWriterRowsBetweenCheck));

gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public class GobblinOrcWriterConfigs {
3939
* Max buffer size of the Gobblin ORC Writer that it can be tuned to
4040
*/
4141
public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE = ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
42+
/**
43+
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
44+
*/
45+
public static final String ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = "auto.selfTune.rowCheck.factor";
46+
4247
/**
4348
* How often should the Gobblin ORC Writer check for tuning
4449
*/
@@ -60,6 +65,12 @@ public class GobblinOrcWriterConfigs {
6065
*/
6166
public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX + "max.rows.between.memory.checks";
6267

68+
/**
69+
* Enable a maximum buffer size of both the native ORC writer and the Gobblin ORC writer by the size of a stripe divided by the estimated
70+
* size of each record. This is to capture the case when records are extremely large and cause large buffer sizes to dominate the memory usage
71+
*/
72+
public static final String ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE = ORC_WRITER_PREFIX + "auto.selfTune.max.buffer.orc.stripe";
73+
6374
public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX + "instrumented";
6475

6576
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
@@ -70,10 +81,9 @@ public class GobblinOrcWriterConfigs {
7081
*/
7182
public static final int DEFAULT_CONCURRENT_WRITERS = 3;
7283
public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 0.3;
73-
/**
74-
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
75-
*/
84+
7685
public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
86+
7787
public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE = DEFAULT_ORC_WRITER_BATCH_SIZE;
7888
public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
7989
/**

gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java

+47-7
Original file line numberDiff line numberDiff line change
@@ -268,21 +268,23 @@ public void testSelfTuneRowBatchCalculation() throws Exception {
268268
// Force a larger initial batchSize that can be tuned down
269269
orcWriter.batchSize = 10;
270270
orcWriter.rowBatch.ensureSize(10);
271+
// Given that the available memory is very high, the resulting batchsize should be maxed out
271272
orcWriter.availableMemory = 100000000;
272-
// Given the amount of available memory and a low stripe size, and estimated rowBatchSize, the resulting batchsize should be maxed out
273+
// Consider that the batch size incrementally increases based on the difference between target and current batchsize (10)
273274
orcWriter.tuneBatchSize(10);
274-
System.out.println(orcWriter.batchSize);
275-
// Take into account that increases in batchsize are multiplied by a factor to prevent large jumps in batchsize
276-
Assert.assertTrue(orcWriter.batchSize == (GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
275+
Assert.assertEquals(orcWriter.batchSize, 505);
276+
orcWriter.tuneBatchSize(10);
277+
Assert.assertEquals(orcWriter.batchSize, 752);
278+
277279
orcWriter.availableMemory = 100;
278280
orcWriter.tuneBatchSize(10);
279281
// Given that the amount of available memory is low, the resulting batchsize should be 1
280-
Assert.assertTrue(orcWriter.batchSize == 1);
282+
Assert.assertEquals(orcWriter.batchSize,1);
281283
orcWriter.availableMemory = 10000;
282284
orcWriter.rowBatch.ensureSize(10000);
283285
// Since the rowBatch is large, the resulting batchsize should still be 1 even with more memory
284286
orcWriter.tuneBatchSize(10);
285-
Assert.assertTrue(orcWriter.batchSize == 1);
287+
Assert.assertEquals(orcWriter.batchSize, 1);
286288
}
287289

288290
@Test
@@ -322,4 +324,42 @@ public void testStatePersistenceWhenClosingWriter() throws IOException {
322324
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
323325
Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
324326
}
325-
}
327+
328+
@Test
329+
public void testSelfTuneRowBatchCalculationWithStripeMax() throws Exception {
330+
Schema schema =
331+
new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
332+
List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json");
333+
334+
// Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder
335+
FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
336+
(FsDataWriterBuilder<Schema, GenericRecord>) Mockito.mock(FsDataWriterBuilder.class);
337+
when(mockBuilder.getSchema()).thenReturn(schema);
338+
339+
State dummyState = new WorkUnit();
340+
String stagingDir = Files.createTempDir().getAbsolutePath();
341+
String outputDir = Files.createTempDir().getAbsolutePath();
342+
dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
343+
dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune");
344+
dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
345+
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true");
346+
dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
347+
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, "true");
348+
when(mockBuilder.getFileName(dummyState)).thenReturn("file");
349+
350+
// Having a closer to manage the life-cycle of the writer object.
351+
Closer closer = Closer.create();
352+
GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState));
353+
// Force a larger initial batchSize that can be tuned down
354+
orcWriter.batchSize = 10;
355+
orcWriter.rowBatch.ensureSize(10);
356+
orcWriter.availableMemory = 100000000;
357+
// Since the stripe size is 100, the resulting batchsize should be 10 (100/10)
358+
orcWriter.tuneBatchSize(10);
359+
Assert.assertEquals(orcWriter.batchSize,10);
360+
361+
// Increasing the estimated record size should decrease the max batch size
362+
orcWriter.tuneBatchSize(100);
363+
Assert.assertEquals(orcWriter.batchSize,1);
364+
}
365+
}

0 commit comments

Comments
 (0)