Skip to content

Commit e90080b

Browse files
committed
workload/changefeed: allow changefeed-cursor
This commit adds a new option, changefeed-cursor, which allows specifying the timestamp after which the changefeed should start emitting events and trigger a catch-up scan. If not specified, the changefeed defaults to using the current cluster logical timestamp. This helps tests specify catch-up scan behavior.
1 parent 1e00b30 commit e90080b

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

pkg/workload/changefeeds/changefeeds.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func AddChangefeedToQueryLoad(
2727
gen workload.ConnFlagser,
2828
dbName string,
2929
resolvedTarget time.Duration,
30+
cursorStr string,
3031
urls []string,
3132
reg *histogram.Registry,
3233
ql *workload.QueryLoad,
@@ -76,9 +77,10 @@ func AddChangefeedToQueryLoad(
7677
return err
7778
}
7879

79-
var cursorStr string
80-
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
81-
return err
80+
if cursorStr == "" {
81+
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
82+
return err
83+
}
8284
}
8385

8486
tableNames := strings.Builder{}

pkg/workload/cli/run.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ var (
116116
"changefeed-max-rate", 0, "Maximum frequency of changefeed ingestion. If 0, no limit.")
117117
changefeedResolvedTarget = runFlags.Duration("changefeed-resolved-target", 5*time.Second,
118118
"The target frequency of resolved messages. O to disable resolved reporting and accept server defaults.")
119+
changefeedCursor = runFlags.String("changefeed-cursor", "",
120+
"The cursor to start the changefeed from. If empty, the changefeed will start from the current cluster logical timestamp.")
119121
)
120122

121123
func init() {
@@ -507,7 +509,8 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
507509

508510
if *changefeed {
509511
log.Dev.Infof(ctx, "adding changefeed to query load...")
510-
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, urls, reg, &ops)
512+
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, *changefeedCursor, urls, reg, &ops)
513+
511514
if err != nil && !*tolerateErrors {
512515
return errors.Wrapf(err, "failed to initialize changefeed")
513516
}

0 commit comments

Comments
 (0)