Skip to content

Commit 1431597

Browse files
authored
Merge pull request #6074 from gchq/dependabot/cargo/rust/datafusion-51.0.0
Bump datafusion from 50.2.0 to 51.0.0 in /rust
2 parents c00d325 + 2c35dd2 commit 1431597

File tree

8 files changed

+850
-1170
lines changed

8 files changed

+850
-1170
lines changed

java/compaction/compaction-datafusion/src/main/java/sleeper/compaction/datafusion/DataFusionCompactionRunner.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1919
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.fs.FileSystem;
2021
import org.apache.hadoop.fs.Path;
2122
import org.apache.parquet.hadoop.ParquetWriter;
2223
import org.slf4j.Logger;
@@ -66,14 +67,17 @@ public DataFusionCompactionRunner(DataFusionAwsConfig awsConfig, Configuration h
6667
@Override
6768
public RowsProcessed compact(CompactionJob job, TableProperties tableProperties, Region region) throws IOException {
6869
jnr.ffi.Runtime runtime = jnr.ffi.Runtime.getRuntime(DataFusionCompactionFunctions.INSTANCE);
69-
7070
FFICommonConfig params = createCompactionParams(job, tableProperties, region, awsConfig, runtime);
7171

7272
RowsProcessed result = invokeDataFusion(job, params, runtime);
7373

74-
if (result.getRowsWritten() < 1) {
74+
// Get the filesystem object
75+
FileSystem fs = FileSystem.get(hadoopConf);
76+
Path outputPath = new Path(job.getOutputFile());
77+
78+
if (result.getRowsWritten() < 1 && !fs.exists(outputPath)) {
7579
try (ParquetWriter<Row> writer = ParquetRowWriterFactory.createParquetRowWriter(
76-
new Path(job.getOutputFile()), tableProperties, hadoopConf)) {
80+
outputPath, tableProperties, hadoopConf)) {
7781
// Write an empty file. This should be temporary, as we expect DataFusion to add support for this.
7882
// See the test should_merge_empty_files in compaction_test.rs
7983
}

java/compaction/compaction-datafusion/src/test/java/sleeper/compaction/datafusion/DataFusionCompactionRunnerIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,11 @@ void shouldMergeTwoEmptyFiles() throws Exception {
261261
// When
262262
runTask(job);
263263

264+
// The should_merge_empty_files test in rust/sleeper_core/tests/compaction_test.rs asserts that no result file
265+
// is written when empty files are compacted. This differs here as it appears DataFusion will not always
266+
// write an empty results file. See comment in that file. We ensure a result file is always written on the Java
267+
// side in DataFusionCompactionRunner.
268+
264269
// Then
265270
assertThat(getRowsProcessed(job)).isEqualTo(new RowsProcessed(0, 0));
266271
assertThat(readDataFile(job.getOutputFile())).isEmpty();

0 commit comments

Comments
 (0)