Skip to content

Commit cb021ee

Browse files
committed
address hammad + sicheng comments
1 parent 7ee5ef8 commit cb021ee

File tree

12 files changed

+98
-76
lines changed

12 files changed

+98
-76
lines changed

go/pkg/sysdb/coordinator/task.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
222222
return nil, status.Errorf(codes.Internal, "attached function has invalid completion_offset: %d", attachedFunction.CompletionOffset)
223223
}
224224

225+
if !attachedFunction.IsReady {
226+
return nil, status.Errorf(codes.Internal, "serialized attached function is not ready")
227+
}
228+
225229
attachedFunctionProto := &coordinatorpb.AttachedFunction{
226230
Id: attachedFunction.ID.String(),
227231
Name: attachedFunction.Name,
@@ -236,7 +240,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
236240
DatabaseId: attachedFunction.DatabaseID,
237241
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
238242
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
239-
IsReady: attachedFunction.IsReady,
240243
}
241244
if attachedFunction.OutputCollectionID != nil {
242245
attachedFunctionProto.OutputCollectionId = attachedFunction.OutputCollectionID

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,9 @@ func (s *Server) FlushCollectionCompactionAndAttachedFunction(ctx context.Contex
579579
log.Error("FlushCollectionCompactionAndAttachedFunction failed. flush_compactions is empty")
580580
return nil, grpcutils.BuildInternalGrpcError("at least one flush_compaction is required")
581581
}
582+
583+
// Currently we only expect 1 or 2 flush_compactions. We expect the former in the case of backfills
584+
// and the latter in the case of normal compactions with an attached function.
582585
if len(flushReqs) > 2 {
583586
log.Error("FlushCollectionCompactionAndAttachedFunction failed. too many flush_compactions", zap.Int("count", len(flushReqs)))
584587
return nil, grpcutils.BuildInternalGrpcError("expected 1 or 2 flush_compactions")

idl/chromadb/proto/coordinator.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,6 @@ message AttachedFunction {
588588
uint64 created_at = 15;
589589
uint64 updated_at = 16;
590590
string function_id = 17;
591-
bool is_ready = 18;
592591
}
593592

594593
message GetAttachedFunctionByNameResponse {

rust/segment/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ impl MaterializeLogsResult {
504504
/// # Note
505505
/// This is primarily intended for testing and should not be used in production code.
506506
/// Use the `materialize_logs` function instead for proper log materialization.
507-
#[doc(hidden)]
507+
#[cfg(test)]
508508
pub fn from_logs_for_test(logs: Chunk<LogRecord>) -> Result<Self, LogMaterializerError> {
509509
let mut materialized = Vec::new();
510510
for (index, (log_record, _)) in logs.iter().enumerate() {

rust/sysdb/src/sysdb.rs

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use chroma_types::chroma_proto::sys_db_client::SysDbClient;
99
use chroma_types::chroma_proto::VersionListForCollection;
1010
use chroma_types::{
1111
chroma_proto, chroma_proto::CollectionVersionInfo, CollectionAndSegments, CollectionFlushInfo,
12-
CollectionMetadataUpdate, CountCollectionsError, CreateCollectionError, CreateDatabaseError,
13-
CreateDatabaseResponse, CreateTenantError, CreateTenantResponse, Database,
14-
DeleteCollectionError, DeleteDatabaseError, DeleteDatabaseResponse, GetCollectionByCrnError,
15-
GetCollectionSizeError, GetCollectionWithSegmentsError, GetCollectionsError, GetDatabaseError,
16-
GetDatabaseResponse, GetSegmentsError, GetTenantError, GetTenantResponse,
17-
InternalCollectionConfiguration, InternalUpdateCollectionConfiguration,
18-
ListAttachedFunctionsError, ListCollectionVersionsError, ListDatabasesError,
19-
ListDatabasesResponse, Metadata, ResetError, ResetResponse, SegmentFlushInfo,
20-
SegmentFlushInfoConversionError, SegmentUuid, UpdateCollectionError, UpdateTenantError,
21-
UpdateTenantResponse,
12+
CollectionFlushInfoConversionError, CollectionMetadataUpdate, CountCollectionsError,
13+
CreateCollectionError, CreateDatabaseError, CreateDatabaseResponse, CreateTenantError,
14+
CreateTenantResponse, Database, DeleteCollectionError, DeleteDatabaseError,
15+
DeleteDatabaseResponse, GetCollectionByCrnError, GetCollectionSizeError,
16+
GetCollectionWithSegmentsError, GetCollectionsError, GetDatabaseError, GetDatabaseResponse,
17+
GetSegmentsError, GetTenantError, GetTenantResponse, InternalCollectionConfiguration,
18+
InternalUpdateCollectionConfiguration, ListAttachedFunctionsError, ListCollectionVersionsError,
19+
ListDatabasesError, ListDatabasesResponse, Metadata, ResetError, ResetResponse,
20+
SegmentFlushInfo, SegmentFlushInfoConversionError, SegmentUuid, UpdateCollectionError,
21+
UpdateTenantError, UpdateTenantResponse,
2222
};
2323
use chroma_types::{
2424
AttachedFunctionUpdateInfo, AttachedFunctionUuid, BatchGetCollectionSoftDeleteStatusError,
@@ -1657,38 +1657,10 @@ impl GrpcSysDb {
16571657
attached_function_update: AttachedFunctionUpdateInfo,
16581658
) -> Result<FlushCompactionAndAttachedFunctionResponse, FlushCompactionError> {
16591659
// Process all collections into flush compaction requests
1660-
let mut flush_compactions = Vec::with_capacity(collections.len());
1661-
1662-
for collection in collections {
1663-
let segment_compaction_info = collection
1664-
.segment_flush_info
1665-
.iter()
1666-
.map(|segment_flush_info| segment_flush_info.try_into())
1667-
.collect::<Result<
1668-
Vec<chroma_proto::FlushSegmentCompactionInfo>,
1669-
SegmentFlushInfoConversionError,
1670-
>>()?;
1671-
1672-
let schema_str = collection.schema.and_then(|s| {
1673-
serde_json::to_string(&s).ok().or_else(|| {
1674-
tracing::error!(
1675-
"Failed to serialize schema for flush_compaction_and_attached_function"
1676-
);
1677-
None
1678-
})
1679-
});
1680-
1681-
flush_compactions.push(chroma_proto::FlushCollectionCompactionRequest {
1682-
tenant_id: collection.tenant_id,
1683-
collection_id: collection.collection_id.0.to_string(),
1684-
log_position: collection.log_position,
1685-
collection_version: collection.collection_version,
1686-
segment_compaction_info,
1687-
total_records_post_compaction: collection.total_records_post_compaction,
1688-
size_bytes_post_compaction: collection.size_bytes_post_compaction,
1689-
schema_str,
1690-
});
1691-
}
1660+
let flush_compactions = collections
1661+
.into_iter()
1662+
.map(|collection| collection.try_into())
1663+
.collect::<Result<Vec<_>, _>>()?;
16921664

16931665
let attached_function_update_proto = Some(chroma_proto::AttachedFunctionUpdateInfo {
16941666
id: attached_function_update.attached_function_id.0.to_string(),
@@ -2137,6 +2109,8 @@ pub enum FlushCompactionError {
21372109
FailedToFlushCompaction(#[from] tonic::Status),
21382110
#[error("Failed to convert segment flush info")]
21392111
SegmentFlushInfoConversionError(#[from] SegmentFlushInfoConversionError),
2112+
#[error("Failed to convert collection flush info")]
2113+
CollectionFlushInfoConversionError(#[from] CollectionFlushInfoConversionError),
21402114
#[error("Failed to convert flush compaction response")]
21412115
FlushCompactionResponseConversionError(#[from] FlushCompactionResponseConversionError),
21422116
#[error("Collection not found in sysdb")]
@@ -2158,6 +2132,7 @@ impl ChromaError for FlushCompactionError {
21582132
}
21592133
}
21602134
FlushCompactionError::SegmentFlushInfoConversionError(_) => ErrorCodes::Internal,
2135+
FlushCompactionError::CollectionFlushInfoConversionError(_) => ErrorCodes::Internal,
21612136
FlushCompactionError::FlushCompactionResponseConversionError(_) => ErrorCodes::Internal,
21622137
FlushCompactionError::CollectionNotFound => ErrorCodes::Internal,
21632138
FlushCompactionError::SegmentNotFound => ErrorCodes::Internal,

rust/sysdb/src/test_sysdb.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,6 @@ fn attached_function_to_proto(
716716
created_at: system_time_to_micros(attached_function.created_at),
717717
updated_at: system_time_to_micros(attached_function.updated_at),
718718
function_id: attached_function.function_id.to_string(),
719-
is_ready: false, // Default value since Rust struct doesn't track this field
720719
}
721720
}
722721

rust/types/src/flush.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use super::{AttachedFunctionUuid, CollectionUuid, ConversionError, Schema};
22
use crate::{
3-
chroma_proto::{
4-
FilePaths, FlushCollectionCompactionAndAttachedFunctionResponse, FlushSegmentCompactionInfo,
5-
},
3+
chroma_proto::{self, FilePaths, FlushSegmentCompactionInfo},
64
SegmentUuid,
75
};
86
use chroma_error::{ChromaError, ErrorCodes};
@@ -130,6 +128,41 @@ pub enum SegmentFlushInfoConversionError {
130128
DecodeError(#[from] ConversionError),
131129
}
132130

131+
#[derive(Error, Debug)]
132+
pub enum CollectionFlushInfoConversionError {
133+
#[error("Failed to convert segment flush info: {0}")]
134+
SegmentConversionError(#[from] SegmentFlushInfoConversionError),
135+
#[error("Failed to serialize schema")]
136+
SchemaSerializationError,
137+
}
138+
139+
impl TryFrom<CollectionFlushInfo> for chroma_proto::FlushCollectionCompactionRequest {
140+
type Error = CollectionFlushInfoConversionError;
141+
142+
fn try_from(collection: CollectionFlushInfo) -> Result<Self, Self::Error> {
143+
let segment_compaction_info = collection
144+
.segment_flush_info
145+
.iter()
146+
.map(|segment_flush_info| segment_flush_info.try_into())
147+
.collect::<Result<Vec<_>, _>>()?;
148+
149+
let schema_str = collection
150+
.schema
151+
.and_then(|s| serde_json::to_string(&s).ok());
152+
153+
Ok(crate::chroma_proto::FlushCollectionCompactionRequest {
154+
tenant_id: collection.tenant_id,
155+
collection_id: collection.collection_id.0.to_string(),
156+
log_position: collection.log_position,
157+
collection_version: collection.collection_version,
158+
segment_compaction_info,
159+
total_records_post_compaction: collection.total_records_post_compaction,
160+
size_bytes_post_compaction: collection.size_bytes_post_compaction,
161+
schema_str,
162+
})
163+
}
164+
}
165+
133166
#[derive(Debug)]
134167
pub struct FlushCompactionResponse {
135168
pub collection_id: CollectionUuid,
@@ -158,13 +191,13 @@ impl FlushCompactionResponse {
158191
}
159192
}
160193

161-
impl TryFrom<FlushCollectionCompactionAndAttachedFunctionResponse>
194+
impl TryFrom<chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse>
162195
for FlushCompactionAndAttachedFunctionResponse
163196
{
164197
type Error = FlushCompactionResponseConversionError;
165198

166199
fn try_from(
167-
value: FlushCollectionCompactionAndAttachedFunctionResponse,
200+
value: chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse,
168201
) -> Result<Self, Self::Error> {
169202
// Parse all collections from the repeated field
170203
let mut collections = Vec::with_capacity(value.collections.len());

rust/worker/src/execution/operators/execute_task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl AttachedFunctionExecutor for CountAttachedFunction {
6969
},
7070
};
7171

72-
Ok(Chunk::new(std::sync::Arc::from(vec![output_record])))
72+
Ok(Chunk::new(Arc::from(vec![output_record])))
7373
}
7474
}
7575

@@ -115,7 +115,7 @@ impl ExecuteAttachedFunctionOperator {
115115
#[derive(Debug)]
116116
pub struct ExecuteAttachedFunctionInput {
117117
/// The materialized log outputs to process
118-
pub materialized_logs: Arc<Vec<MaterializeLogOutput>>,
118+
pub materialized_logs: Vec<MaterializeLogOutput>,
119119
/// The tenant ID
120120
pub tenant_id: String,
121121
/// The output collection ID where results are written

rust/worker/src/execution/operators/materialize_logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl MaterializeLogInput {
5656
}
5757
}
5858

59-
#[derive(Debug)]
59+
#[derive(Debug, Clone)]
6060
pub struct MaterializeLogOutput {
6161
pub result: MaterializeLogsResult,
6262
pub collection_logical_size_delta: i64,

rust/worker/src/execution/orchestration/apply_logs_orchestrator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::collections::HashMap;
22

33
use async_trait::async_trait;
44
use chroma_error::{ChromaError, ErrorCodes};
@@ -60,7 +60,7 @@ pub struct ApplyLogsOrchestrator {
6060
segment_spans: HashMap<SegmentUuid, Span>,
6161

6262
// Store the materialized outputs from LogFetchOrchestrator
63-
materialized_log_data: Option<Arc<Vec<MaterializeLogOutput>>>,
63+
materialized_log_data: Option<Vec<MaterializeLogOutput>>,
6464

6565
metrics: CompactionMetrics,
6666
}
@@ -181,7 +181,7 @@ impl ApplyLogsOrchestratorResponse {
181181
impl ApplyLogsOrchestrator {
182182
pub fn new(
183183
context: &CompactionContext,
184-
materialized_log_data: Option<Arc<Vec<MaterializeLogOutput>>>,
184+
materialized_log_data: Option<Vec<MaterializeLogOutput>>,
185185
) -> Self {
186186
ApplyLogsOrchestrator {
187187
context: context.clone(),

0 commit comments

Comments
 (0)