Skip to content

Commit 47cb225

Browse files
committed
[CHORE]: Remove next_run from attached_functions
1 parent e7dde61 commit 47cb225

File tree

13 files changed

+5
-39
lines changed

13 files changed

+5
-39
lines changed

.github/workflows/_python-tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ jobs:
141141
- "chromadb/test/distributed/test_sanity.py"
142142
- "chromadb/test/distributed/test_log_backpressure.py"
143143
- "chromadb/test/distributed/test_repair_collection_log_offset.py"
144-
- "chromadb/test/distributed/test_task_api.py"
145144
include:
146145
- test-glob: "chromadb/test/property/test_add.py"
147146
parallelized: false

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea
286286
OutputCollectionName: outputCollectionName,
287287
FunctionID: functionID,
288288
MinRecordsForInvocation: int64(MinRecordsForInvocation),
289-
NextRun: now,
290289
CreatedAt: now,
291290
UpdatedAt: now,
292291
}
@@ -425,7 +424,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {
425424
OutputCollectionName: outputCollectionName,
426425
FunctionID: functionID,
427426
MinRecordsForInvocation: int64(MinRecordsForInvocation),
428-
NextRun: now,
429427
CreatedAt: now,
430428
UpdatedAt: now,
431429
}
@@ -516,7 +514,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
516514
OutputCollectionName: outputCollectionName,
517515
FunctionID: existingOperatorID,
518516
MinRecordsForInvocation: int64(MinRecordsForInvocation),
519-
NextRun: now,
520517
CreatedAt: now,
521518
UpdatedAt: now,
522519
}
@@ -595,7 +592,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
595592
MinRecordsForInvocation: 10,
596593
CreatedAt: testTime,
597594
UpdatedAt: testTime,
598-
NextRun: testTime,
599595
},
600596
}
601597

@@ -632,9 +628,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
632628
if af.UpdatedAt != expectedMicros {
633629
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
634630
}
635-
if af.NextRunAt != expectedMicros {
636-
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
637-
}
638631

639632
// Verify these are NOT in seconds (would be ~1000x smaller)
640633
expectedSeconds := uint64(testTime.Unix())
@@ -644,9 +637,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
644637
if af.UpdatedAt == expectedSeconds {
645638
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
646639
}
647-
if af.NextRunAt == expectedSeconds {
648-
t.Error("NextRunAt appears to be in seconds instead of microseconds")
649-
}
650640

651641
mockMetaDomain.AssertExpectations(t)
652642
mockAttachedFunctionDb.AssertExpectations(t)

go/pkg/sysdb/coordinator/list_attached_functions_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
6464
DatabaseID: "db",
6565
CompletionOffset: 10,
6666
MinRecordsForInvocation: 5,
67-
NextRun: now,
6867
CreatedAt: now,
6968
UpdatedAt: now,
7069
},
@@ -79,7 +78,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
7978
DatabaseID: "db",
8079
CompletionOffset: 20,
8180
MinRecordsForInvocation: 15,
82-
NextRun: now,
8381
CreatedAt: now,
8482
UpdatedAt: now,
8583
},
@@ -157,7 +155,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
157155
DatabaseID: "db",
158156
CompletionOffset: 0,
159157
MinRecordsForInvocation: 1,
160-
NextRun: now,
161158
CreatedAt: now,
162159
UpdatedAt: now,
163160
}
@@ -192,7 +189,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
192189
DatabaseID: "db",
193190
CompletionOffset: 0,
194191
MinRecordsForInvocation: 1,
195-
NextRun: now,
196192
CreatedAt: now,
197193
UpdatedAt: now,
198194
}

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ type FlushCollectionInfo struct {
9999
CollectionVersion int32
100100
TenantLastCompactionTime int64
101101
// Optional attached function fields (only populated for attached-function-based compactions)
102-
AttachedFunctionNextRun *time.Time
103102
AttachedFunctionCompletionOffset *int64
104103
}
105104

go/pkg/sysdb/coordinator/task.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
111111

112112
// Validation passed, reuse the concurrent attached function's data
113113
attachedFunctionID = concurrentAttachedFunction.ID
114-
nextRun = concurrentAttachedFunction.NextRun
115114
// Already created, skip Phase 2
116115
skipPhase2 = true
117116
return nil
@@ -187,16 +186,13 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
187186
FunctionParams: paramsJSON,
188187
CompletionOffset: 0,
189188
LastRun: nil,
190-
NextRun: now,
191189
MinRecordsForInvocation: int64(req.MinRecordsForInvocation),
192190
CurrentAttempts: 0,
193191
CreatedAt: now,
194192
UpdatedAt: now,
195193
OldestWrittenNonce: nil,
196194
}
197195

198-
nextRun = attachedFunction.NextRun
199-
200196
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
201197
if err != nil {
202198
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
@@ -289,7 +285,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
289285
MinRecordsForInvocation: uint64(attachedFunction.MinRecordsForInvocation),
290286
TenantId: attachedFunction.TenantID,
291287
DatabaseId: attachedFunction.DatabaseID,
292-
NextRunAt: uint64(attachedFunction.NextRun.UnixMicro()),
293288
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
294289
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
295290
}
@@ -729,7 +724,6 @@ func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *
729724
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
730725
}
731726

732-
protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
733727
if af.OutputCollectionID != nil {
734728
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
735729
}

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ type AttachedFunction struct {
1818
FunctionParams string `gorm:"column:function_params;type:jsonb;not null"`
1919
CompletionOffset int64 `gorm:"column:completion_offset;type:bigint;not null;default:0"`
2020
LastRun *time.Time `gorm:"column:last_run;type:timestamp"`
21-
NextRun time.Time `gorm:"column:next_run;type:timestamp;not null"`
2221
MinRecordsForInvocation int64 `gorm:"column:min_records_for_invocation;type:bigint;not null;default:100"`
2322
CurrentAttempts int32 `gorm:"column:current_attempts;type:integer;not null;default:0"`
2423
IsAlive bool `gorm:"column:is_alive;type:boolean;not null;default:true"`
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Remove next_run column from attached_functions table as it's no longer needed
2+
ALTER TABLE "public"."attached_functions" DROP COLUMN "next_run";

go/pkg/sysdb/metastore/db/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:MZM4Y7X9Du9xvcyQPGNROFh/x7cMwSqAn1G42UKNrTc=
1+
h1:bIntlbZeBztzLttsjmX5fudfrCbxOd/mgF4fJ91j21s=
22
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
33
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
44
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
@@ -24,3 +24,4 @@ h1:MZM4Y7X9Du9xvcyQPGNROFh/x7cMwSqAn1G42UKNrTc=
2424
20251023154800.sql h1:zS764cNSc6y4e25cVr0GqJSrWdtwTvvxWXDTF51RAe8=
2525
20251029223300.sql h1:7ptfELfwHoDLLGRz6hLRYAq3OZyzNqa73U5DjlrN/YE=
2626
20251114125442.sql h1:oRHN+AO+xYnYa3aF0QzSa3T/TRi8ETCydp2sDT/nSnI=
27+
20251114134400.sql h1:N30qnVNjR+d4RoArJ11YrixyIsNODxqXFpgiREEhczs=

idl/chromadb/proto/coordinator.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,6 @@ message AttachedFunction {
575575
uint64 min_records_for_invocation = 9;
576576
string tenant_id = 10;
577577
string database_id = 11;
578-
uint64 next_run_at = 12;
579578
uint64 created_at = 15;
580579
uint64 updated_at = 16;
581580
string function_id = 17;

rust/sysdb/src/sysdb.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,10 +1743,6 @@ impl GrpcSysDb {
17431743
})?,
17441744
);
17451745

1746-
// Parse next_run timestamp from microseconds
1747-
let next_run = std::time::SystemTime::UNIX_EPOCH
1748-
+ std::time::Duration::from_micros(attached_function.next_run_at);
1749-
17501746
// Convert params from Struct to JSON string
17511747
let params_str = attached_function.params.map(|s| {
17521748
let json_value = prost_struct_to_json(s);
@@ -1795,7 +1791,6 @@ impl GrpcSysDb {
17951791
tenant_id: attached_function.tenant_id,
17961792
database_id: attached_function.database_id,
17971793
last_run: None,
1798-
next_run,
17991794
completion_offset: attached_function.completion_offset,
18001795
min_records_for_invocation: attached_function.min_records_for_invocation,
18011796
is_deleted: false,

0 commit comments

Comments
 (0)