Skip to content

[CLN]: make materialization function rather than struct #3165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::execution::operator::Operator;
use crate::execution::operators::normalize_vectors::normalize;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::LogMaterializer;
use crate::segment::LogMaterializerError;
use crate::segment::{materialize_logs, LogMaterializerError};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunction;
Expand Down Expand Up @@ -134,9 +133,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
}
}
};
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
let logs = match log_materializer
.materialize()
let logs = match materialize_logs(&record_segment_reader, &input.log, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
6 changes: 2 additions & 4 deletions rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {

#[cfg(test)]
mod tests {
use crate::segment::materialize_logs;
use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError};
use crate::segment::types::SegmentFlusher;
use crate::segment::LogMaterializer;
use crate::{
execution::{
operator::Operator,
Expand Down Expand Up @@ -293,9 +293,7 @@ mod tests {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
.expect("Log materialization failed");
Expand Down
9 changes: 4 additions & 5 deletions rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
metadata_segment::{MetadataSegmentError, MetadataSegmentReader},
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError, MaterializedLogRecord,
LogMaterializerError, MaterializedLogRecord,
},
};

Expand Down Expand Up @@ -416,10 +417,8 @@ impl Operator<FilterInput, FilterOutput> for FilterOperator {
}
Err(e) => Err(*e),
}?;
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()
let cloned_record_segment_reader = record_segment_reader.clone();
let materialized_logs = materialize_logs(&cloned_record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;
let metadata_log_reader = MetadataLogReader::new(&materialized_logs);
Expand Down
21 changes: 9 additions & 12 deletions rust/worker/src/execution/operators/get_vectors_operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{self, RecordSegmentReader},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};
use async_trait::async_trait;
Expand Down Expand Up @@ -120,17 +121,13 @@ impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsO
},
};
// Step 1: Materialize the logs.
let materializer = LogMaterializer::new(
record_segment_reader.clone(),
input.log_records.clone(),
None,
);
let mat_records = match materializer.materialize().await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};
let mat_records =
match materialize_logs(&record_segment_reader, &input.log_records, None).await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};

// Search the log records for the user ids
let mut remaining_search_user_ids: HashSet<String> =
Expand Down
11 changes: 3 additions & 8 deletions rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::{LogMaterializer, LogMaterializerError, MaterializedLogRecord};
use crate::segment::{materialize_logs, LogMaterializerError, MaterializedLogRecord};
use crate::{
execution::operator::Operator,
segment::{
Expand Down Expand Up @@ -147,13 +147,8 @@ impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
}
},
};
let log_materializer = LogMaterializer::new(
Some(record_segment_reader.clone()),
input.logs.clone(),
None,
);
let logs = match log_materializer
.materialize()
let some_reader = Some(record_segment_reader.clone());
let logs = match materialize_logs(&some_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/operators/knn_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tonic::async_trait;
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -72,8 +73,7 @@ impl Operator<KnnLogInput, KnnLogOutput> for KnnOperator {
Err(e) => Err(*e),
}?;

let materializer = LogMaterializer::new(record_segment_reader, input.logs.clone(), None);
let logs = materializer.materialize().await?;
let logs = materialize_logs(&record_segment_reader, &input.logs, None).await?;

let target_vector;
let target_embedding = if let DistanceFunction::Cosine = input.distance_function {
Expand Down
8 changes: 3 additions & 5 deletions rust/worker/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -213,10 +214,7 @@ impl Operator<LimitInput, LimitOutput> for LimitOperator {
let mut materialized_log_offset_ids = match &input.log_offset_ids {
SignedRoaringBitmap::Include(rbm) => rbm.clone(),
SignedRoaringBitmap::Exclude(rbm) => {
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()
let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
12 changes: 4 additions & 8 deletions rust/worker/src/execution/operators/prefetch_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -84,13 +85,8 @@ impl Operator<PrefetchRecordInput, PrefetchRecordOutput> for PrefetchRecordOpera
Err(e) => return Err((*e).into()),
};

let materializer = LogMaterializer::new(
Some(record_segment_reader.clone()),
input.logs.clone(),
None,
);
let materialized_logs = materializer
.materialize()
let some_reader = Some(record_segment_reader.clone());
let materialized_logs = materialize_logs(&some_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
9 changes: 4 additions & 5 deletions rust/worker/src/execution/operators/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tracing::{error, trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -104,10 +105,8 @@ impl Operator<ProjectionInput, ProjectionOutput> for ProjectionOperator {
}
Err(e) => Err(*e),
}?;
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()

let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
18 changes: 8 additions & 10 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::segment::materialize_logs;
use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::metadata_segment::MetadataSegmentWriter;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::LogMaterializer;
use crate::segment::LogMaterializerError;
use crate::segment::SegmentWriter;
use crate::{
Expand Down Expand Up @@ -160,16 +160,14 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
};
}
};
let materializer = LogMaterializer::new(
record_segment_reader,
input.chunk.clone(),
Some(input.next_offset_id.clone()),
);
// Materialize the logs.
let res = match materializer
.materialize()
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
let res = match materialize_logs(
&record_segment_reader,
&input.chunk,
Some(input.next_offset_id.clone()),
)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
Expand Down
44 changes: 16 additions & 28 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,11 +1075,12 @@ mod test {
#![allow(deprecated)]

use crate::segment::{
materialize_logs,
metadata_segment::{MetadataSegmentReader, MetadataSegmentWriter},
record_segment::{
RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter,
},
LogMaterializer, SegmentFlusher, SegmentWriter,
SegmentFlusher, SegmentWriter,
};
use chroma_blockstore::{
arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
Expand Down Expand Up @@ -1193,9 +1194,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1265,9 +1264,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1347,9 +1345,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1487,9 +1484,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1566,9 +1561,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1740,9 +1734,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1801,9 +1793,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1962,9 +1953,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -2021,9 +2010,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down
Loading
Loading