Skip to content

Commit f02e24a

Browse files
committed
[ENH]: Make all functions incremental
1 parent 3fec52a commit f02e24a

File tree

5 files changed

+543
-110
lines changed

5 files changed

+543
-110
lines changed

rust/segment/src/types.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl ChromaError for LogMaterializerError {
125125
#[derive(Debug)]
126126
struct MaterializedLogRecord {
127127
// False if the record exists only in the log, otherwise true.
128-
offset_id_exists_in_segment: bool,
128+
pub offset_id_exists_in_segment: bool,
129129
// If present in the record segment then it is the offset id
130130
// in the record segment at which the record was found.
131131
// If not present in the segment then it is the offset id
@@ -504,26 +504,30 @@ impl MaterializeLogsResult {
504504
/// # Note
505505
/// This is primarily intended for testing and should not be used in production code.
506506
/// Use the `materialize_logs` function instead for proper log materialization.
507-
pub fn from_logs_for_test(logs: Chunk<LogRecord>) -> Result<Self, LogMaterializerError> {
508-
let mut materialized = Vec::new();
509-
for (index, (log_record, _)) in logs.iter().enumerate() {
510-
let offset_id = (index + 1) as u32;
511-
let mut mat_record =
512-
MaterializedLogRecord::from_log_record(offset_id, index, log_record)?;
513-
514-
// Override the operation for delete records
515-
if log_record.record.operation == Operation::Delete {
516-
mat_record.final_operation = MaterializedLogOperation::DeleteExisting;
517-
mat_record.final_document_at_log_index = None;
518-
mat_record.final_embedding_at_log_index = None;
519-
}
520-
521-
materialized.push(mat_record);
522-
}
523-
Ok(Self {
524-
logs,
525-
materialized: Chunk::new(materialized.into()),
526-
})
507+
pub async fn from_logs_for_test(
508+
logs: Chunk<LogRecord>,
509+
record_segment_reader: &Option<RecordSegmentReader<'_>>,
510+
) -> Result<Self, LogMaterializerError> {
511+
// let mut materialized = Vec::new();
512+
// for (index, (log_record, _)) in logs.iter().enumerate() {
513+
// let offset_id = (index + 1) as u32;
514+
// let mut mat_record =
515+
// MaterializedLogRecord::from_log_record(offset_id, index, log_record)?;
516+
517+
// // Override the operation for delete records
518+
// if log_record.record.operation == Operation::Delete {
519+
// mat_record.final_operation = MaterializedLogOperation::DeleteExisting;
520+
// mat_record.final_document_at_log_index = None;
521+
// mat_record.final_embedding_at_log_index = None;
522+
// }
523+
524+
// materialized.push(mat_record);
525+
// }
526+
// Ok(Self {
527+
// logs,
528+
// materialized: Chunk::new(materialized.into()),
529+
// })
530+
materialize_logs(record_segment_reader, logs, None).await
527531
}
528532
}
529533

rust/types/src/operation.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@ pub enum MaterializedLogOperation {
3232
DeleteExisting,
3333
}
3434

35+
impl std::fmt::Display for MaterializedLogOperation {
36+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37+
match self {
38+
MaterializedLogOperation::Initial => write!(f, "Initial"),
39+
MaterializedLogOperation::AddNew => write!(f, "AddNew"),
40+
MaterializedLogOperation::OverwriteExisting => write!(f, "OverwriteExisting"),
41+
MaterializedLogOperation::UpdateExisting => write!(f, "UpdateExisting"),
42+
MaterializedLogOperation::DeleteExisting => write!(f, "DeleteExisting"),
43+
}
44+
}
45+
}
46+
3547
#[derive(Error, Debug)]
3648
pub enum OperationConversionError {
3749
#[error("Invalid operation, valid operations are: Add, Upsert, Update, Delete")]

0 commit comments

Comments
 (0)