Skip to content

Commit f8ab4ac

Browse files
committed
Use LazyCell for segments, fix write counter
1 parent 7245657 commit f8ab4ac

File tree

1 file changed

+11
-14
lines changed

1 file changed

+11
-14
lines changed

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub struct CompactOrchestrator {
108108
max_compaction_size: usize,
109109
max_partition_size: usize,
110110
// Populated during the compaction process
111-
cached_segments: Option<Vec<Segment>>,
111+
cached_segments: OnceCell<Vec<Segment>>,
112112
writers: OnceCell<(
113113
RecordSegmentWriter,
114114
Box<DistributedHNSWSegmentWriter>,
@@ -197,7 +197,7 @@ impl CompactOrchestrator {
197197
result_channel,
198198
max_compaction_size,
199199
max_partition_size,
200-
cached_segments: None,
200+
cached_segments: OnceCell::new(),
201201
writers: OnceCell::new(),
202202
}
203203
}
@@ -310,7 +310,7 @@ impl CompactOrchestrator {
310310
},
311311
};
312312

313-
self.num_write_tasks = partitions.len() as i32;
313+
self.num_write_tasks = partitions.len() as i32 * 3; // 3 different segment types
314314
for partition in partitions.iter() {
315315
let operator = MaterializeLogOperator::new();
316316
let input = MaterializeLogInput::new(
@@ -426,17 +426,14 @@ impl CompactOrchestrator {
426426
}
427427

428428
async fn get_all_segments(&mut self) -> Result<Vec<Segment>, GetSegmentsError> {
429-
if let Some(segments) = &self.cached_segments {
430-
return Ok(segments.clone());
431-
}
432-
433-
let segments = self
434-
.sysdb
435-
.get_segments(None, None, None, self.collection_id)
436-
.await?;
437-
438-
self.cached_segments = Some(segments.clone());
439-
Ok(segments)
429+
self.cached_segments
430+
.get_or_try_init(|| async {
431+
self.sysdb
432+
.get_segments(None, None, None, self.collection_id)
433+
.await
434+
})
435+
.await
436+
.cloned()
440437
}
441438

442439
async fn get_segment(

0 commit comments

Comments
 (0)