Skip to content

Commit 88e8e29

Browse files
committed
Fix bug
1 parent da85ad7 commit 88e8e29

File tree

5 files changed

+61
-33
lines changed

5 files changed

+61
-33
lines changed

rust/worker/src/execution/operators/apply_log_to_segment_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl<Writer: SegmentWriter + Send + Sync + Clone>
170170
// Apply materialized records.
171171
match input
172172
.segment_writer
173-
.apply_materialized_log_chunk(res.clone())
173+
.apply_materialized_log_chunk(res)
174174
.instrument(tracing::trace_span!(
175175
"Apply materialized logs",
176176
segment = input.segment_writer.get_name()

rust/worker/src/execution/operators/limit.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{cmp::Ordering, num::TryFromIntError, sync::atomic};
1+
use std::{cmp::Ordering, num::TryFromIntError};
22

33
use chroma_blockstore::provider::BlockfileProvider;
44
use chroma_error::{ChromaError, ErrorCodes};
@@ -118,8 +118,7 @@ impl<'me> SeekScanner<'me> {
118118

119119
let mut size = self
120120
.record_segment
121-
.get_current_max_offset_id()
122-
.load(atomic::Ordering::Relaxed)
121+
.get_max_offset_id()
123122
.max(self.log_offset_ids.max().unwrap_or(0));
124123
if size == 0 {
125124
return Ok(0);

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ impl CompactOrchestrator {
284284
>,
285285
>,
286286
chunk: Chunk<LogRecord>,
287+
curr_max_offset_id: Arc<AtomicU32>,
287288
) {
288289
let operator = ApplyLogToSegmentWriterOperator::new();
289290
let input = ApplyLogToSegmentWriterInput::new(
@@ -294,7 +295,7 @@ impl CompactOrchestrator {
294295
.as_ref()
295296
.expect("WriteSegmentsInput: Record segment not set in the input")
296297
.clone(),
297-
self.curr_max_offset_id.clone(),
298+
curr_max_offset_id,
298299
);
299300
let task = wrap(operator, input, self_address.clone());
300301
match self.dispatcher.send(task, Some(Span::current())).await {
@@ -334,24 +335,40 @@ impl CompactOrchestrator {
334335

335336
self.num_write_tasks = partitions.len() as i32 * 3;
336337
for partition in partitions.iter() {
338+
// We must have an atomic that tracks the current offset ID across partitions to guarantee unique offset IDs for every record.
339+
// Every partition has a log materializer that is responsible for updating this atomic.
340+
// However, because the log materializer returns borrowed values, we currently run the materializer multiple times for each partition (for each segment writer).
341+
// This means that only one log materializer run per partition should access and update the atomic shared across all partitions, and the atomics for the other log materializer runs should not be shared.
342+
let shared_curr_max_offset_id = self.curr_max_offset_id.clone();
343+
let unshared_curr_max_offset_id1 = Arc::new(AtomicU32::new(
344+
self.curr_max_offset_id
345+
.load(std::sync::atomic::Ordering::SeqCst),
346+
));
347+
let unshared_curr_max_offset_id2 = Arc::new(AtomicU32::new(
348+
self.curr_max_offset_id
349+
.load(std::sync::atomic::Ordering::SeqCst),
350+
));
337351
self.dispatch_apply_log_to_segment_writer_task(
338352
record_segment_writer.clone(),
339353
self_address.clone(),
340354
partition.clone(),
355+
shared_curr_max_offset_id,
341356
)
342357
.await;
343358

344359
self.dispatch_apply_log_to_segment_writer_task(
345360
hnsw_segment_writer.clone(),
346361
self_address.clone(),
347362
partition.clone(),
363+
unshared_curr_max_offset_id1,
348364
)
349365
.await;
350366

351367
self.dispatch_apply_log_to_segment_writer_task(
352368
metadata_segment_writer.clone(),
353369
self_address.clone(),
354370
partition.clone(),
371+
unshared_curr_max_offset_id2,
355372
)
356373
.await;
357374
}
@@ -480,7 +497,7 @@ impl CompactOrchestrator {
480497
tracing::debug!("Record Segment Writer created");
481498
match RecordSegmentReader::from_segment(record_segment, &self.blockfile_provider).await {
482499
Ok(reader) => {
483-
self.curr_max_offset_id = reader.get_current_max_offset_id();
500+
self.curr_max_offset_id = Arc::new(AtomicU32::new(reader.get_max_offset_id()));
484501
}
485502
Err(_) => {
486503
self.curr_max_offset_id = Arc::new(AtomicU32::new(0));
@@ -676,9 +693,9 @@ impl Handler<TaskResult<ApplyLogToSegmentWriterOutput, ApplyLogToSegmentWriterOp
676693
.expect("Invariant violation. Writers not set.");
677694

678695
self.flush_s3(
679-
record_segment_writer.clone(),
680-
hnsw_segment_writer.clone(),
681-
metadata_segment_writer.clone(),
696+
record_segment_writer,
697+
hnsw_segment_writer,
698+
metadata_segment_writer,
682699
ctx.receiver(),
683700
)
684701
.await;

rust/worker/src/segment/record_segment.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ use chroma_types::{
1313
use std::cmp::Ordering;
1414
use std::collections::HashMap;
1515
use std::fmt::{self, Debug, Formatter};
16-
use std::sync::atomic::AtomicU32;
17-
use std::sync::Arc;
1816
use thiserror::Error;
1917
use uuid::Uuid;
2018

@@ -632,7 +630,7 @@ pub struct RecordSegmentReader<'me> {
632630
user_id_to_id: BlockfileReader<'me, &'me str, u32>,
633631
id_to_user_id: BlockfileReader<'me, u32, &'me str>,
634632
id_to_data: BlockfileReader<'me, u32, DataRecord<'me>>,
635-
curr_max_offset_id: Arc<AtomicU32>,
633+
max_offset_id: u32,
636634
}
637635

638636
#[derive(Error, Debug)]
@@ -641,6 +639,8 @@ pub enum RecordSegmentReaderCreationError {
641639
UninitializedSegment,
642640
#[error("Blockfile Open Error")]
643641
BlockfileOpenError(#[from] Box<OpenError>),
642+
#[error("Error reading blockfile: {0}")]
643+
BlockfileReadError(#[from] Box<dyn ChromaError>),
644644
#[error("Segment has invalid number of files")]
645645
InvalidNumberOfFiles,
646646
// This case should never happen, so it's internal, but until our APIs rule it out, we have it.
@@ -655,6 +655,7 @@ impl ChromaError for RecordSegmentReaderCreationError {
655655
fn code(&self) -> ErrorCodes {
656656
match self {
657657
RecordSegmentReaderCreationError::BlockfileOpenError(e) => e.code(),
658+
RecordSegmentReaderCreationError::BlockfileReadError(e) => e.code(),
658659
RecordSegmentReaderCreationError::InvalidNumberOfFiles => ErrorCodes::InvalidArgument,
659660
RecordSegmentReaderCreationError::UninitializedSegment => ErrorCodes::InvalidArgument,
660661
RecordSegmentReaderCreationError::DataRecordNotFound(_) => ErrorCodes::Internal,
@@ -695,10 +696,15 @@ impl RecordSegmentReader<'_> {
695696
};
696697
let exising_max_offset_id = match max_offset_id_bf_reader {
697698
Some(reader) => match reader.get("", MAX_OFFSET_ID).await {
698-
Ok(Some(max_offset_id)) => Arc::new(AtomicU32::new(max_offset_id)),
699-
Ok(None) | Err(_) => Arc::new(AtomicU32::new(0)),
699+
Ok(Some(max_offset_id)) => max_offset_id,
700+
Ok(None) => 0,
701+
Err(e) => {
702+
return Err(Box::new(
703+
RecordSegmentReaderCreationError::BlockfileReadError(e),
704+
))
705+
}
700706
},
701-
None => Arc::new(AtomicU32::new(0)),
707+
None => 0,
702708
};
703709

704710
let user_id_to_id = match blockfile_provider
@@ -760,12 +766,12 @@ impl RecordSegmentReader<'_> {
760766
user_id_to_id,
761767
id_to_user_id,
762768
id_to_data,
763-
curr_max_offset_id: existing_max_offset_id,
769+
max_offset_id: existing_max_offset_id,
764770
})
765771
}
766772

767-
pub(crate) fn get_current_max_offset_id(&self) -> Arc<AtomicU32> {
768-
self.curr_max_offset_id.clone()
773+
pub(crate) fn get_max_offset_id(&self) -> u32 {
774+
self.max_offset_id
769775
}
770776

771777
pub(crate) async fn get_user_id_for_offset_id(

rust/worker/src/segment/types.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -441,26 +441,20 @@ impl<'me> LogMaterializer<'me> {
441441
"Total length of logs in materializer: {}",
442442
self.logs.total_len()
443443
);
444-
let next_offset_id;
445-
match self.curr_offset_id.as_ref() {
444+
let next_offset_id = match self.curr_offset_id.as_ref() {
446445
Some(curr_offset_id) => {
447-
next_offset_id = curr_offset_id.clone();
448-
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
446+
curr_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
447+
curr_offset_id.clone()
449448
}
450449
None => {
451450
match self.record_segment_reader.as_ref() {
452-
Some(reader) => {
453-
next_offset_id = reader.get_current_max_offset_id();
454-
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
455-
}
456-
// This means that the segment is uninitialized so counting starts
457-
// from 1.
458-
None => {
459-
next_offset_id = Arc::new(AtomicU32::new(1));
460-
}
461-
};
451+
Some(reader) => Arc::new(AtomicU32::new(reader.get_max_offset_id() + 1)),
452+
// This means that the segment is uninitialized so counting starts from 1.
453+
None => Arc::new(AtomicU32::new(1)),
454+
}
462455
}
463-
}
456+
};
457+
464458
// Populate entries that are present in the record segment.
465459
let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
466460
let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
@@ -898,6 +892,9 @@ mod tests {
898892
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
899893
panic!("Error creating record segment reader");
900894
}
895+
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
896+
panic!("Error creating record segment reader");
897+
}
901898
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
902899
panic!("Error creating record segment reader");
903900
}
@@ -1197,6 +1194,9 @@ mod tests {
11971194
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
11981195
panic!("Error creating record segment reader");
11991196
}
1197+
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
1198+
panic!("Error creating record segment reader");
1199+
}
12001200
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
12011201
panic!("Error creating record segment reader");
12021202
}
@@ -1488,6 +1488,9 @@ mod tests {
14881488
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
14891489
panic!("Error creating record segment reader");
14901490
}
1491+
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
1492+
panic!("Error creating record segment reader");
1493+
}
14911494
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
14921495
panic!("Error creating record segment reader");
14931496
}
@@ -1798,6 +1801,9 @@ mod tests {
17981801
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
17991802
panic!("Error creating record segment reader");
18001803
}
1804+
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
1805+
panic!("Error creating record segment reader");
1806+
}
18011807
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
18021808
panic!("Error creating record segment reader");
18031809
}

0 commit comments

Comments
 (0)