@@ -28,8 +28,10 @@ use databend_common_catalog::plan::PartInfoPtr;
2828use databend_common_catalog:: plan:: PushDownInfo ;
2929use databend_common_catalog:: plan:: TopK ;
3030use databend_common_catalog:: runtime_filter_info:: RuntimeFilterEntry ;
31+ use databend_common_catalog:: runtime_filter_info:: RuntimeFilterReady ;
3132use databend_common_catalog:: runtime_filter_info:: RuntimeFilterStats ;
3233use databend_common_catalog:: table_context:: TableContext ;
34+ use databend_common_exception:: ErrorCode ;
3335use databend_common_exception:: Result ;
3436use databend_common_expression:: filter_helper:: FilterHelpers ;
3537use databend_common_expression:: types:: BooleanType ;
@@ -58,7 +60,6 @@ use databend_common_pipeline_core::processors::InputPort;
5860use databend_common_pipeline_core:: processors:: OutputPort ;
5961use databend_common_pipeline_core:: processors:: Processor ;
6062use databend_common_pipeline_core:: processors:: ProcessorPtr ;
61- use databend_common_sql:: IndexType ;
6263use xorf:: BinaryFuse16 ;
6364
6465use super :: native_data_source:: NativeDataSource ;
@@ -199,7 +200,7 @@ pub struct NativeDeserializeDataTransform {
199200 scan_progress : Arc < Progress > ,
200201
201202 // Structures for table scan information:
202- table_index : IndexType ,
203+ scan_id : usize ,
203204 block_reader : Arc < BlockReader > ,
204205 src_schema : DataSchema ,
205206 output_schema : DataSchema ,
@@ -216,6 +217,8 @@ pub struct NativeDeserializeDataTransform {
216217 // Structures for the bloom runtime filter:
217218 ctx : Arc < dyn TableContext > ,
218219 bloom_runtime_filter : Option < Vec < BloomRuntimeFilterRef > > ,
220+ need_wait_runtime_filter : bool ,
221+ runtime_filter_ready : Vec < Arc < RuntimeFilterReady > > ,
219222
220223 // Structures for aggregating index:
221224 index_reader : Arc < Option < AggIndexReader > > ,
@@ -308,11 +311,13 @@ impl NativeDeserializeDataTransform {
308311 let mut output_schema = plan. schema ( ) . as_ref ( ) . clone ( ) ;
309312 output_schema. remove_internal_fields ( ) ;
310313 let output_schema: DataSchema = ( & output_schema) . into ( ) ;
314+ let need_wait_runtime_filter =
315+ !ctx. get_cluster ( ) . is_empty ( ) && ctx. get_wait_runtime_filter ( plan. scan_id ) ;
311316
312317 Ok ( ProcessorPtr :: create ( Box :: new (
313318 NativeDeserializeDataTransform {
314319 ctx,
315- table_index : plan. table_index ,
320+ scan_id : plan. scan_id ,
316321 func_ctx,
317322 scan_progress,
318323 block_reader,
@@ -334,6 +339,8 @@ impl NativeDeserializeDataTransform {
334339 bloom_runtime_filter : None ,
335340 read_state : ReadPartState :: new ( ) ,
336341 need_reserve_block_info,
342+ need_wait_runtime_filter,
343+ runtime_filter_ready : Vec :: new ( ) ,
337344 } ,
338345 ) ) )
339346 }
@@ -752,7 +759,7 @@ impl NativeDeserializeDataTransform {
752759 if self . bloom_runtime_filter . is_none ( ) {
753760 let bloom_filters = self
754761 . ctx
755- . get_runtime_filters ( self . table_index )
762+ . get_runtime_filters ( self . scan_id )
756763 . into_iter ( )
757764 . filter_map ( |entry| {
758765 let RuntimeFilterEntry { bloom, stats, .. } = entry;
@@ -774,6 +781,20 @@ impl NativeDeserializeDataTransform {
774781 }
775782 }
776783
784+ fn prepare_runtime_filter_wait ( & mut self ) -> bool {
785+ if !self . need_wait_runtime_filter {
786+ return false ;
787+ }
788+ self . need_wait_runtime_filter = false ;
789+ let runtime_filter_ready = self . ctx . get_runtime_filter_ready ( self . scan_id ) ;
790+ if !runtime_filter_ready. is_empty ( ) {
791+ self . runtime_filter_ready = runtime_filter_ready;
792+ true
793+ } else {
794+ false
795+ }
796+ }
797+
777798 /// Pre-process the partition before reading it.
778799 fn pre_process_partition ( & mut self ) -> Result < ( ) > {
779800 debug_assert ! ( !self . columns. is_empty( ) ) ;
@@ -848,6 +869,7 @@ impl NativeDeserializeDataTransform {
848869 }
849870}
850871
872+ #[ async_trait:: async_trait]
851873impl Processor for NativeDeserializeDataTransform {
852874 fn name ( & self ) -> String {
853875 String :: from ( "NativeDeserializeDataTransform" )
@@ -858,6 +880,10 @@ impl Processor for NativeDeserializeDataTransform {
858880 }
859881
860882 fn event ( & mut self ) -> Result < Event > {
883+ if self . prepare_runtime_filter_wait ( ) {
884+ return Ok ( Event :: Async ) ;
885+ }
886+
861887 if self . output . is_finished ( ) {
862888 self . input . finish ( ) ;
863889 return Ok ( Event :: Finished ) ;
@@ -903,6 +929,38 @@ impl Processor for NativeDeserializeDataTransform {
903929 Ok ( Event :: NeedData )
904930 }
905931
932+ #[ async_backtrace:: framed]
933+ async fn async_process ( & mut self ) -> Result < ( ) > {
934+ use std:: time:: Duration ;
935+
936+ use databend_common_base:: base:: tokio:: time:: timeout;
937+
938+ let timeout_duration = Duration :: from_secs ( 30 ) ;
939+
940+ for runtime_filter_ready in & self . runtime_filter_ready {
941+ let mut rx = runtime_filter_ready. runtime_filter_watcher . subscribe ( ) ;
942+ if ( * rx. borrow ( ) ) . is_some ( ) {
943+ continue ;
944+ }
945+
946+ match timeout ( timeout_duration, rx. changed ( ) ) . await {
947+ Ok ( Ok ( ( ) ) ) => { }
948+ Ok ( Err ( _) ) => {
949+ return Err ( ErrorCode :: TokioError ( "watcher's sender is dropped" ) ) ;
950+ }
951+ Err ( _) => {
952+ log:: warn!(
953+ "Runtime filter wait timeout after {:?} for scan_id: {}" ,
954+ timeout_duration,
955+ self . scan_id
956+ ) ;
957+ }
958+ }
959+ }
960+ self . runtime_filter_ready . clear ( ) ;
961+ Ok ( ( ) )
962+ }
963+
906964 fn process ( & mut self ) -> Result < ( ) > {
907965 // Try to get the bloom runtime filter from the context if existed.
908966 self . try_init_bloom_runtime_filter ( ) ;
0 commit comments