Skip to content

Commit 8149a17

Browse files
authored
Merge pull request #6062 from gchq/6061-remove-datafusion-fallback-to-java
Issue 6061 - Remove DataFusion query fallback to Java
2 parents 716a986 + 660eefa commit 8149a17

File tree

3 files changed

+18
-28
lines changed

3 files changed

+18
-28
lines changed

java/clients/src/main/java/sleeper/clients/api/aws/SleeperClientQueryProvider.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import sleeper.foreign.datafusion.DataFusionAwsConfig;
2424
import sleeper.parquet.utils.TableHadoopConfigurationProvider;
2525
import sleeper.query.core.rowretrieval.LeafPartitionRowRetrieverProvider;
26+
import sleeper.query.core.rowretrieval.QueryEngineSelector;
2627
import sleeper.query.datafusion.DataFusionQueryContext;
2728
import sleeper.query.runner.rowretrieval.LeafPartitionRowRetrieverImpl;
2829

@@ -66,8 +67,9 @@ static SleeperClientQueryProvider withThreadPoolForEachClient(int threadPoolSize
6667
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
6768
DataFusionQueryContext dataFusionContext = DataFusionQueryContext.createIfLoaded(RootAllocator::new);
6869
LeafPartitionRowRetrieverProvider javaProvider = new LeafPartitionRowRetrieverImpl.Provider(executorService, hadoopProvider);
70+
LeafPartitionRowRetrieverProvider dataFusionProvider = dataFusionContext.createDataFusionProvider(DataFusionAwsConfig::getDefault);
6971
return ShutdownWrapper.shutdown(
70-
dataFusionContext.createQueryEngineSelectorWithFallback(DataFusionAwsConfig::getDefault, javaProvider),
72+
QueryEngineSelector.javaAndDataFusion(javaProvider, dataFusionProvider),
7173
new UncheckedAutoCloseables(List.of(
7274
dataFusionContext::close,
7375
executorService::shutdown)));
@@ -95,7 +97,8 @@ private PersistentThreadPool(ExecutorService executorService) {
9597
@Override
9698
public ShutdownWrapper<LeafPartitionRowRetrieverProvider> getRowRetrieverProvider(TableHadoopConfigurationProvider hadoopProvider) {
9799
LeafPartitionRowRetrieverProvider javaProvider = new LeafPartitionRowRetrieverImpl.Provider(executorService, hadoopProvider);
98-
return ShutdownWrapper.noShutdown(dataFusionContext.createQueryEngineSelectorWithFallback(DataFusionAwsConfig::getDefault, javaProvider));
100+
LeafPartitionRowRetrieverProvider dataFusionProvider = dataFusionContext.createDataFusionProvider(DataFusionAwsConfig::getDefault);
101+
return ShutdownWrapper.noShutdown(QueryEngineSelector.javaAndDataFusion(javaProvider, dataFusionProvider));
99102
}
100103

101104
@Override

java/query/query-datafusion/src/main/java/sleeper/query/datafusion/DataFusionQueryContext.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import sleeper.foreign.bridge.FFIContext;
2323
import sleeper.foreign.datafusion.DataFusionAwsConfig;
2424
import sleeper.query.core.rowretrieval.LeafPartitionRowRetrieverProvider;
25-
import sleeper.query.core.rowretrieval.QueryEngineSelector;
2625

2726
import java.util.function.Supplier;
2827

2928
/**
30-
* A wrapper for the DataFusion FFI context. Allows for fallback if DataFusion could not be loaded.
29+
* A wrapper for the DataFusion FFI context. Allows for delayed failure if DataFusion could not be loaded, in case you
30+
* only want to use the Java data engine.
3131
*/
3232
public class DataFusionQueryContext implements AutoCloseable {
3333

@@ -44,7 +44,7 @@ private DataFusionQueryContext(FFIContext<DataFusionQueryFunctions> context, Buf
4444
/**
4545
* Creates the DataFusion FFI context and Arrow allocator if the DataFusion functions were loaded. The FFI context
4646
* and Arrow allocator will be closed when the query context is closed. If the DataFusion functions were not loaded,
47-
* this allows falling back to the Java implementation.
47+
* this avoids immediate failure in case you only want to use the Java data engine.
4848
*
4949
* @param allocator a constructor for the Arrow allocator
5050
* @return the context
@@ -56,35 +56,19 @@ public static DataFusionQueryContext createIfLoaded(Supplier<BufferAllocator> al
5656
}
5757

5858
/**
59-
* Creates a context where no DataFusion functions are used. Will always fall back to the Java implementation.
59+
* Creates a context where no DataFusion functions are used. Supports cases where an instance of this class is
60+
* required but we will always use the Java data engine.
6061
*
6162
* @return the context
6263
*/
6364
public static DataFusionQueryContext none() {
6465
return new DataFusionQueryContext(null, null);
6566
}
6667

67-
/**
68-
* Creates a row retriever provider for use in queries, using the Java or DataFusion implementation depending on
69-
* configuration. If the DataFusion functions could not be loaded, the Java implementation will always be used.
70-
*
71-
* @param awsConfig a constructor for the AWS configuration
72-
* @param javaProvider the Java implementation
73-
* @return the row retriever provider
74-
*/
75-
public LeafPartitionRowRetrieverProvider createQueryEngineSelectorWithFallback(Supplier<DataFusionAwsConfig> awsConfig, LeafPartitionRowRetrieverProvider javaProvider) {
76-
if (context == null) {
77-
LOGGER.warn("Falling back to Java row retriever as DataFusion was not loaded");
78-
return javaProvider;
79-
} else {
80-
return QueryEngineSelector.javaAndDataFusion(javaProvider,
81-
new DataFusionLeafPartitionRowRetriever.Provider(awsConfig.get(), allocator, context));
82-
}
83-
}
84-
8568
/**
8669
* Creates a DataFusion row retriever provider for use in queries. If the DataFusion functions could not be loaded,
87-
* the returned row retriever will fail if it is used.
70+
* the returned row retriever will fail if it is used. This is usually wrapped with an engine selector to allow
71+
* switching between different data engines.
8872
*
8973
* @param awsConfig a constructor for the AWS configuration
9074
* @return the row retriever provider

java/query/query-lambda/src/main/java/sleeper/query/lambda/SqsLeafPartitionQueryLambda.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import sleeper.foreign.datafusion.DataFusionAwsConfig;
3434
import sleeper.parquet.utils.TableHadoopConfigurationProvider;
3535
import sleeper.query.core.rowretrieval.LeafPartitionQueryExecutor;
36+
import sleeper.query.core.rowretrieval.LeafPartitionRowRetrieverProvider;
37+
import sleeper.query.core.rowretrieval.QueryEngineSelector;
3638
import sleeper.query.datafusion.DataFusionQueryContext;
3739
import sleeper.query.runner.rowretrieval.LeafPartitionRowRetrieverImpl;
3840
import sleeper.query.runner.tracker.DynamoDBQueryTracker;
@@ -74,13 +76,14 @@ public SqsLeafPartitionQueryLambda(
7476
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient);
7577
TableHadoopConfigurationProvider hadoopProvider = TableHadoopConfigurationProvider.withCache(
7678
TableHadoopConfigurationProvider.forQueryLambdas(instanceProperties));
79+
LeafPartitionRowRetrieverProvider javaProvider = new LeafPartitionRowRetrieverImpl.Provider(
80+
Executors.newFixedThreadPool(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_ROW_RETRIEVAL_THREADS)), hadoopProvider);
81+
LeafPartitionRowRetrieverProvider dataFusionProvider = dataFusionContext.createDataFusionProvider(DataFusionAwsConfig::getDefault);
7782
messageHandler = new QueryMessageHandler(tablePropertiesProvider, new DynamoDBQueryTracker(instanceProperties, dynamoClient));
7883
processor = SqsLeafPartitionQueryProcessor.builder()
7984
.sqsClient(sqsClient).s3Client(s3Client).dynamoClient(dynamoClient)
8085
.instanceProperties(instanceProperties).tablePropertiesProvider(tablePropertiesProvider).hadoopProvider(hadoopProvider)
81-
.rowRetrieverProvider(dataFusionContext.createQueryEngineSelectorWithFallback(awsConfig,
82-
new LeafPartitionRowRetrieverImpl.Provider(
83-
Executors.newFixedThreadPool(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_ROW_RETRIEVAL_THREADS)), hadoopProvider)))
86+
.rowRetrieverProvider(QueryEngineSelector.javaAndDataFusion(javaProvider, dataFusionProvider))
8487
.build();
8588
}
8689

0 commit comments

Comments
 (0)