@@ -139,12 +139,16 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
139139 }
140140 SourceType :: IngestV2 => {
141141 // Expect: the source should exist since we just read it from `get_source_configs`.
142+ // Note that we keep all shards, including Closed shards:
143+ // A closed shards still needs to be indexed.
142144 let shard_ids: Vec < ShardId > = model
143145 . list_shards_for_source ( & source_uid)
144146 . expect ( "source should exist" )
145147 . map ( |shard| shard. shard_id )
146148 . collect ( ) ;
147-
149+ if shard_ids. is_empty ( ) {
150+ continue ;
151+ }
148152 sources. push ( SourceToSchedule {
149153 source_uid,
150154 source_type : SourceToScheduleType :: Sharded {
@@ -695,7 +699,23 @@ mod tests {
695699 max_num_pipelines_per_indexer : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
696700 desired_num_pipelines : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
697701 enabled : true ,
698- // ingest v1
702+ // ingest v2
703+ source_params : SourceParams :: Ingest ,
704+ transform_config : None ,
705+ input_format : Default :: default ( ) ,
706+ } ,
707+ )
708+ . unwrap ( ) ;
709+ // ingest v2 without any open shard is skipped.
710+ model
711+ . add_source (
712+ & index_uid,
713+ SourceConfig {
714+ source_id : "ingest_v2_without_shard" . to_string ( ) ,
715+ max_num_pipelines_per_indexer : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
716+ desired_num_pipelines : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
717+ enabled : true ,
718+ // ingest v2
699719 source_params : SourceParams :: Ingest ,
700720 transform_config : None ,
701721 input_format : Default :: default ( ) ,
@@ -717,6 +737,14 @@ mod tests {
717737 } ,
718738 )
719739 . unwrap ( ) ;
740+ let shard = Shard {
741+ index_uid : index_uid. to_string ( ) ,
742+ source_id : "ingest_v2" . to_string ( ) ,
743+ shard_id : 17 ,
744+ shard_state : ShardState :: Open as i32 ,
745+ ..Default :: default ( )
746+ } ;
747+ model. insert_newly_opened_shards ( & index_uid, & "ingest_v2" . to_string ( ) , vec ! [ shard] , 18 ) ;
720748 let shards: Vec < SourceToSchedule > = get_sources_to_schedule ( & model) ;
721749 assert_eq ! ( shards. len( ) , 3 ) ;
722750 }
@@ -816,6 +844,7 @@ mod tests {
816844
817845 use quickwit_config:: SourceInputFormat ;
818846 use quickwit_proto:: indexing:: mcpu;
847+ use quickwit_proto:: ingest:: { Shard , ShardState } ;
819848
820849 fn kafka_source_params_for_test ( ) -> SourceParams {
821850 SourceParams :: Kafka ( KafkaSourceParams {
0 commit comments