Skip to content

Commit 4accc16

Browse files
committed
workload/changefeed: add more logs
1 parent e90080b commit 4accc16

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

pkg/workload/changefeeds/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//pkg/util/hlc",
10+
"//pkg/util/log",
1011
"//pkg/util/timeutil",
1112
"//pkg/workload",
1213
"//pkg/workload/histogram",

pkg/workload/changefeeds/changefeeds.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
1617
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1718
"github.com/cockroachdb/cockroach/pkg/workload"
1819
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
1920
"github.com/cockroachdb/errors"
2021
"github.com/jackc/pgx/v5"
2122
)
2223

24+
var logResolvedEvery = log.Every(10 * time.Second)
25+
2326
// AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra
2427
// worker to run a changefeed over the tables of the generator.
2528
func AddChangefeedToQueryLoad(
@@ -122,10 +125,18 @@ func AddChangefeedToQueryLoad(
122125
return true
123126
}
124127
var rows pgx.Rows
128+
var changefeedStartTime time.Time
125129
maybeSetupRows := func() (done bool) {
126130
if rows != nil {
127131
return false
128132
}
133+
if changefeedStartTime.IsZero() {
134+
changefeedStartTime = timeutil.Now()
135+
}
136+
log.Dev.Infof(ctx, "creating changefeed with stmt: %s with args %v", stmt, args)
137+
if epoch, err := hlc.ParseHLC(cursorStr); err == nil {
138+
log.Dev.Infof(ctx, "starting a changefeed after %s", timeutil.Since(epoch.GoTime()))
139+
}
129140
var err error
130141
rows, err = conn.Query(cfCtx, stmt, args...)
131142
return maybeMarkDone(err)
@@ -169,6 +180,11 @@ func AddChangefeedToQueryLoad(
169180
return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved)
170181
}
171182
lastResolved = resolved
183+
if !lastResolved.IsEmpty() {
184+
if logResolvedEvery.ShouldLog() {
185+
log.Dev.Infof(ctx, "received resolved timestamp: lag=%s, ts=%s, sinceStart=%s", timeutil.Since(lastResolved.GoTime()), lastResolved, timeutil.Since(changefeedStartTime))
186+
}
187+
}
172188
} else {
173189
return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2])
174190
}

0 commit comments

Comments
 (0)