Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions go/pkg/sysdb/coordinator/create_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea
OutputCollectionName: outputCollectionName,
FunctionID: functionID,
MinRecordsForInvocation: int64(MinRecordsForInvocation),
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down Expand Up @@ -425,7 +424,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
OutputCollectionName: outputCollectionName,
FunctionID: functionID,
MinRecordsForInvocation: int64(MinRecordsForInvocation),
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down Expand Up @@ -516,7 +514,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
OutputCollectionName: outputCollectionName,
FunctionID: existingOperatorID,
MinRecordsForInvocation: int64(MinRecordsForInvocation),
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down Expand Up @@ -595,7 +592,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
MinRecordsForInvocation: 10,
CreatedAt: testTime,
UpdatedAt: testTime,
NextRun: testTime,
},
}

Expand Down Expand Up @@ -632,9 +628,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
if af.UpdatedAt != expectedMicros {
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
}
if af.NextRunAt != expectedMicros {
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
}

// Verify these are NOT in seconds (would be ~1000x smaller)
expectedSeconds := uint64(testTime.Unix())
Expand All @@ -644,9 +637,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
if af.UpdatedAt == expectedSeconds {
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
}
if af.NextRunAt == expectedSeconds {
t.Error("NextRunAt appears to be in seconds instead of microseconds")
}

mockMetaDomain.AssertExpectations(t)
mockAttachedFunctionDb.AssertExpectations(t)
Expand Down
4 changes: 0 additions & 4 deletions go/pkg/sysdb/coordinator/list_attached_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
DatabaseID: "db",
CompletionOffset: 10,
MinRecordsForInvocation: 5,
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
},
Expand All @@ -79,7 +78,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
DatabaseID: "db",
CompletionOffset: 20,
MinRecordsForInvocation: 15,
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
},
Expand Down Expand Up @@ -157,7 +155,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
DatabaseID: "db",
CompletionOffset: 0,
MinRecordsForInvocation: 1,
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down Expand Up @@ -192,7 +189,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
DatabaseID: "db",
CompletionOffset: 0,
MinRecordsForInvocation: 1,
NextRun: now,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down
1 change: 0 additions & 1 deletion go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ type FlushCollectionInfo struct {
CollectionVersion int32
TenantLastCompactionTime int64
// Optional attached function fields (only populated for attached-function-based compactions)
AttachedFunctionNextRun *time.Time
AttachedFunctionCompletionOffset *int64
}

Expand Down
6 changes: 0 additions & 6 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att

// Validation passed, reuse the concurrent attached function's data
attachedFunctionID = concurrentAttachedFunction.ID
nextRun = concurrentAttachedFunction.NextRun
// Already created, skip Phase 2
skipPhase2 = true
return nil
Expand Down Expand Up @@ -187,16 +186,13 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
FunctionParams: paramsJSON,
CompletionOffset: 0,
LastRun: nil,
NextRun: now,
MinRecordsForInvocation: int64(req.MinRecordsForInvocation),
CurrentAttempts: 0,
CreatedAt: now,
UpdatedAt: now,
OldestWrittenNonce: nil,
}

nextRun = attachedFunction.NextRun

err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
if err != nil {
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
Expand Down Expand Up @@ -289,7 +285,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
MinRecordsForInvocation: uint64(attachedFunction.MinRecordsForInvocation),
TenantId: attachedFunction.TenantID,
DatabaseId: attachedFunction.DatabaseID,
NextRunAt: uint64(attachedFunction.NextRun.UnixMicro()),
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
}
Expand Down Expand Up @@ -729,7 +724,6 @@ func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
}

protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
if af.OutputCollectionID != nil {
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
}
Expand Down
1 change: 0 additions & 1 deletion go/pkg/sysdb/metastore/db/dbmodel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type AttachedFunction struct {
FunctionParams string `gorm:"column:function_params;type:jsonb;not null"`
CompletionOffset int64 `gorm:"column:completion_offset;type:bigint;not null;default:0"`
LastRun *time.Time `gorm:"column:last_run;type:timestamp"`
NextRun time.Time `gorm:"column:next_run;type:timestamp;not null"`
MinRecordsForInvocation int64 `gorm:"column:min_records_for_invocation;type:bigint;not null;default:100"`
CurrentAttempts int32 `gorm:"column:current_attempts;type:integer;not null;default:0"`
IsAlive bool `gorm:"column:is_alive;type:boolean;not null;default:true"`
Expand Down
2 changes: 2 additions & 0 deletions go/pkg/sysdb/metastore/db/migrations/20251114134400.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Remove next_run column from attached_functions table as it's no longer needed
ALTER TABLE "public"."attached_functions" DROP COLUMN "next_run";
Comment on lines +1 to +2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CompanyBestPractice]

This migration removes a column from the attached_functions table, which is a potentially unsafe operation. According to our SQL migration guidelines, removing a column requires careful consideration:

  1. Backward compatibility: Existing application code still referencing next_run will fail after this migration runs
  2. Zero-downtime deployment: If using blue-green or rolling deployments, old instances will crash when the column disappears
  3. No rollback path: The ALTER TABLE DROP COLUMN is irreversible without restoring from backup

Recommended approach for safer column removal:

  1. First deploy: Stop writing to the column, ignore reads
  2. Second deploy (after monitoring): Remove column references from code
  3. Third deploy: Run migration to drop column

Alternatively, document that this is a breaking change requiring complete system shutdown during deployment.

Context for Agents
[**CompanyBestPractice**]

This migration removes a column from the `attached_functions` table, which is a potentially unsafe operation. According to our SQL migration guidelines, removing a column requires careful consideration:

1. **Backward compatibility**: Existing application code still referencing `next_run` will fail after this migration runs
2. **Zero-downtime deployment**: If using blue-green or rolling deployments, old instances will crash when the column disappears
3. **No rollback path**: The `ALTER TABLE DROP COLUMN` is irreversible without restoring from backup

Recommended approach for safer column removal:
1. First deploy: Stop writing to the column, ignore reads
2. Second deploy (after monitoring): Remove column references from code
3. Third deploy: Run migration to drop column

Alternatively, document that this is a breaking change requiring complete system shutdown during deployment.

File: go/pkg/sysdb/metastore/db/migrations/20251114134400.sql
Line: 2

3 changes: 2 additions & 1 deletion go/pkg/sysdb/metastore/db/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:MZM4Y7X9Du9xvcyQPGNROFh/x7cMwSqAn1G42UKNrTc=
h1:bIntlbZeBztzLttsjmX5fudfrCbxOd/mgF4fJ91j21s=
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
Expand All @@ -24,3 +24,4 @@ h1:MZM4Y7X9Du9xvcyQPGNROFh/x7cMwSqAn1G42UKNrTc=
20251023154800.sql h1:zS764cNSc6y4e25cVr0GqJSrWdtwTvvxWXDTF51RAe8=
20251029223300.sql h1:7ptfELfwHoDLLGRz6hLRYAq3OZyzNqa73U5DjlrN/YE=
20251114125442.sql h1:oRHN+AO+xYnYa3aF0QzSa3T/TRi8ETCydp2sDT/nSnI=
20251114134400.sql h1:N30qnVNjR+d4RoArJ11YrixyIsNODxqXFpgiREEhczs=
1 change: 0 additions & 1 deletion idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ message AttachedFunction {
uint64 min_records_for_invocation = 9;
string tenant_id = 10;
string database_id = 11;
uint64 next_run_at = 12;
uint64 created_at = 15;
uint64 updated_at = 16;
string function_id = 17;
Expand Down
5 changes: 0 additions & 5 deletions rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1743,10 +1743,6 @@ impl GrpcSysDb {
})?,
);

// Parse next_run timestamp from microseconds
let next_run = std::time::SystemTime::UNIX_EPOCH
+ std::time::Duration::from_micros(attached_function.next_run_at);

// Convert params from Struct to JSON string
let params_str = attached_function.params.map(|s| {
let json_value = prost_struct_to_json(s);
Expand Down Expand Up @@ -1795,7 +1791,6 @@ impl GrpcSysDb {
tenant_id: attached_function.tenant_id,
database_id: attached_function.database_id,
last_run: None,
next_run,
completion_offset: attached_function.completion_offset,
min_records_for_invocation: attached_function.min_records_for_invocation,
is_deleted: false,
Expand Down
3 changes: 1 addition & 2 deletions rust/sysdb/src/test_sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ fn attached_function_to_proto(
id: attached_function.id.0.to_string(),
name: attached_function.name.clone(),
function_name: attached_function.function_id.to_string(),
function_id: attached_function.function_id.to_string(),
input_collection_id: attached_function.input_collection_id.0.to_string(),
output_collection_name: attached_function.output_collection_name.clone(),
output_collection_id: attached_function
Expand All @@ -714,9 +713,9 @@ fn attached_function_to_proto(
min_records_for_invocation: attached_function.min_records_for_invocation,
tenant_id: attached_function.tenant_id.clone(),
database_id: attached_function.database_id.clone(),
next_run_at: system_time_to_micros(attached_function.next_run),
created_at: system_time_to_micros(attached_function.created_at),
updated_at: system_time_to_micros(attached_function.updated_at),
function_id: attached_function.function_id.to_string(),
}
}

Expand Down
4 changes: 0 additions & 4 deletions rust/types/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl ChromaError for AdvanceAttachedFunctionError {

#[derive(Debug, Clone)]
pub struct AdvanceAttachedFunctionResponse {
pub next_run: std::time::SystemTime,
pub completion_offset: u64,
}

Expand Down Expand Up @@ -126,8 +125,6 @@ pub enum FlushCompactionResponseConversionError {
InvalidUuid,
#[error("Invalid attached function nonce, valid UUID required")]
InvalidAttachedFunctionNonce,
#[error("Missing next_run timestamp")]
MissingNextRun,
#[error("Invalid timestamp format")]
InvalidTimestamp,
}
Expand All @@ -139,7 +136,6 @@ impl ChromaError for FlushCompactionResponseConversionError {
FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
ErrorCodes::InvalidArgument
}
FlushCompactionResponseConversionError::MissingNextRun => ErrorCodes::InvalidArgument,
FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
}
Expand Down
3 changes: 0 additions & 3 deletions rust/types/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ pub struct AttachedFunction {
/// Timestamp of the last successful function run
#[serde(skip, default)]
pub last_run: Option<SystemTime>,
/// Timestamp when the attached function should next run
#[serde(skip, default = "default_systemtime")]
pub next_run: SystemTime,
/// Completion offset: the WAL position up to which the attached function has processed records
pub completion_offset: u64,
/// Minimum number of new records required before the attached function runs again
Expand Down
Loading