Skip to content

Commit 09a23f0

Browse files
committed
sicheng comments
1 parent 38ff914 commit 09a23f0

File tree

9 files changed

+119
-203
lines changed

9 files changed

+119
-203
lines changed

rust/sysdb/src/sysdb.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,6 @@ impl SysDb {
628628
}
629629
}
630630

631-
#[allow(clippy::too_many_arguments)]
632631
pub async fn flush_compaction_and_attached_function(
633632
&mut self,
634633
collections: Vec<CollectionFlushInfo>,

rust/types/src/flush.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -158,27 +158,6 @@ impl FlushCompactionResponse {
158158
}
159159
}
160160

161-
impl TryFrom<FlushCollectionCompactionAndAttachedFunctionResponse> for FlushCompactionResponse {
162-
type Error = FlushCompactionResponseConversionError;
163-
164-
fn try_from(
165-
value: FlushCollectionCompactionAndAttachedFunctionResponse,
166-
) -> Result<Self, Self::Error> {
167-
// Use first collection for backward compatibility
168-
let first_collection = value
169-
.collections
170-
.first()
171-
.ok_or(FlushCompactionResponseConversionError::MissingCollections)?;
172-
let id = Uuid::parse_str(&first_collection.collection_id)
173-
.map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
174-
Ok(FlushCompactionResponse {
175-
collection_id: CollectionUuid(id),
176-
collection_version: first_collection.collection_version,
177-
last_compaction_time: first_collection.last_compaction_time,
178-
})
179-
}
180-
}
181-
182161
impl TryFrom<FlushCollectionCompactionAndAttachedFunctionResponse>
183162
for FlushCompactionAndAttachedFunctionResponse
184163
{

rust/worker/src/execution/functions/statistics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ impl AttachedFunctionExecutor for StatisticsFunctionExecutor {
181181
let mut counts: HashMap<String, HashMap<StatisticsValue, Box<dyn StatisticsFunction>>> =
182182
HashMap::default();
183183
for (hydrated_record, _index) in input_records.iter() {
184-
// Skip delete operations - they should not be counted in statistics
184+
// This is only applicable for non-incremental statistics.
185+
// TODO(tanujnay112): Change this when we make incremental statistics work.
185186
if hydrated_record.get_operation() == MaterializedLogOperation::DeleteExisting {
186187
continue;
187188
}

rust/worker/src/execution/operators/execute_task.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ impl AttachedFunctionExecutor for CountAttachedFunction {
5050
let records_count = input_records.len() as i64;
5151
let new_total_count = records_count;
5252

53-
println!("new_total_count is {}", new_total_count);
54-
5553
// Create output record with updated count
5654
let mut metadata = std::collections::HashMap::new();
5755
metadata.insert(

rust/worker/src/execution/orchestration/apply_logs_orchestrator.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl ApplyLogsOrchestrator {
206206
let mut tasks_to_run = Vec::new();
207207
self.num_materialized_logs += materialized_logs.len() as u64;
208208

209-
let writers = self.context.get_output_segment_writers()?;
209+
let writers = self.context.get_segment_writers()?;
210210

211211
{
212212
self.num_uncompleted_tasks_by_segment
@@ -255,7 +255,7 @@ impl ApplyLogsOrchestrator {
255255
materialized_logs.clone(),
256256
writers.record_reader.clone(),
257257
self.context
258-
.get_output_collection_info()?
258+
.get_collection_info()?
259259
.collection
260260
.schema
261261
.clone(),
@@ -356,7 +356,7 @@ impl ApplyLogsOrchestrator {
356356
.add(self.num_materialized_logs, &[]);
357357

358358
self.state = ExecutionState::Register;
359-
let collection_info = match self.context.get_output_collection_info() {
359+
let collection_info = match self.context.get_collection_info() {
360360
Ok(collection_info) => collection_info,
361361
Err(err) => {
362362
self.terminate_with_result(Err(err.into()), ctx).await;
@@ -525,7 +525,7 @@ impl Handler<TaskResult<ApplyLogToSegmentWriterOutput, ApplyLogToSegmentWriterOp
525525

526526
if message.segment_type == "MetadataSegmentWriter" {
527527
if let Some(update) = message.schema_update {
528-
let collection_info = match self.context.get_output_collection_info_mut() {
528+
let collection_info = match self.context.get_collection_info_mut() {
529529
Ok(info) => info,
530530
Err(err) => {
531531
return self.terminate_with_result(Err(err.into()), ctx).await;
@@ -582,9 +582,7 @@ impl Handler<TaskResult<ApplyLogToSegmentWriterOutput, ApplyLogToSegmentWriterOp
582582
};
583583

584584
if num_tasks_left == 0 && self.num_uncompleted_materialization_tasks == 0 {
585-
let segment_writer = self
586-
.context
587-
.get_output_segment_writer_by_id(message.segment_id);
585+
let segment_writer = self.context.get_segment_writer_by_id(message.segment_id);
588586
let segment_writer = match self.ok_or_terminate(segment_writer, ctx).await {
589587
Some(writer) => writer,
590588
None => return,
@@ -614,7 +612,7 @@ impl Handler<TaskResult<CommitSegmentWriterOutput, CommitSegmentWriterOperatorEr
614612

615613
// If the flusher received is a record segment flusher, get the number of keys for the blockfile and set it on the orchestrator
616614
if let ChromaSegmentFlusher::RecordSegment(record_segment_flusher) = &message.flusher {
617-
let collection_info = match self.context.get_output_collection_info_mut() {
615+
let collection_info = match self.context.get_collection_info_mut() {
618616
Ok(info) => info,
619617
Err(err) => {
620618
self.terminate_with_result(Err(err.into()), ctx).await;

rust/worker/src/execution/orchestration/attached_function_orchestrator.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl AttachedFunctionOrchestrator {
230230
&self,
231231
) -> Result<&CollectionCompactInfo, AttachedFunctionOrchestratorError> {
232232
self.output_context
233-
.get_output_collection_info()
233+
.get_collection_info()
234234
.map_err(AttachedFunctionOrchestratorError::CompactionContext)
235235
}
236236

@@ -239,7 +239,7 @@ impl AttachedFunctionOrchestrator {
239239
&self,
240240
) -> Result<CollectionUuid, AttachedFunctionOrchestratorError> {
241241
self.output_context
242-
.get_output_collection_info()
242+
.get_collection_info()
243243
.map(|info| info.collection_id)
244244
.map_err(AttachedFunctionOrchestratorError::CompactionContext)
245245
}
@@ -249,9 +249,7 @@ impl AttachedFunctionOrchestrator {
249249
&mut self,
250250
collection_info: CollectionCompactInfo,
251251
) -> Result<(), CollectionCompactInfo> {
252-
self.output_context
253-
.output_collection_info
254-
.set(collection_info)
252+
self.output_context.collection_info.set(collection_info)
255253
}
256254

257255
/// Get the function context if it has been set
@@ -339,7 +337,7 @@ impl AttachedFunctionOrchestrator {
339337
// NOTE: We allow writers to be uninitialized for the case when the materialized logs are empty
340338
let record_reader = self
341339
.output_context
342-
.get_output_segment_writers()
340+
.get_segment_writers()
343341
.ok()
344342
.and_then(|writers| writers.record_reader);
345343

@@ -363,7 +361,7 @@ impl AttachedFunctionOrchestrator {
363361
}
364362
};
365363

366-
let collection_info = match self.output_context.get_output_collection_info_mut() {
364+
let collection_info = match self.output_context.get_collection_info_mut() {
367365
Ok(info) => info,
368366
Err(err) => {
369367
return self.terminate_with_result(Err(err.into()), ctx).await;
@@ -490,7 +488,7 @@ impl Handler<TaskResult<GetAttachedFunctionOutput, GetAttachedFunctionOperatorEr
490488
);
491489

492490
// TODO(tanujnay112): Handle error
493-
let _ = self.function_context.set(FunctionContext {
491+
let _ = self.set_function_context(FunctionContext {
494492
attached_function_id: attached_function.id,
495493
function_id: attached_function.function_id,
496494
updated_completion_offset: attached_function.completion_offset,

0 commit comments

Comments
 (0)