Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add chaos testing for advancer owner #58183

Merged
merged 5 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool {
return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
// HasSubscriptions returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscriptions() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

Expand All @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint {
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
func newCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
Expand Down Expand Up @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed since not used

c.checkpoints = cps
}

func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
region, err := locateKeyOfRegion(ctx, c.env, startKey)
if err != nil {
Expand Down Expand Up @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
}

func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
cp := newCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func TestRemoveTaskAndFlush(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
return !adv.HasSubscriptions()
}, 10*time.Second, 100*time.Millisecond)
}

Expand Down
46 changes: 33 additions & 13 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ import (
const (
flagBackoffTime = "backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
Copy link
Contributor Author

@Tristan1900 Tristan1900 Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed since not used

flagAdvancingByCache = "advancing-by-cache"
flagTryAdvanceThreshold = "try-advance-threshold"
flagCheckPointLagLimit = "check-point-lag-limit"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
DefaultAdvanceByCache = true
// used for chaos testing
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
flagOwnershipCycleInterval = "ownership-cycle-interval"
)

const (
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second

// used for chaos testing, default to disable
DefaultOwnershipCycleInterval = 0
)

var (
Expand All @@ -38,6 +41,11 @@ type Config struct {
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
// The maximum lag could be tolerated for the checkpoint lag.
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`

// Following configs are used in chaos testings, better not to enable in prod
//
// used to periodically becomes/retire advancer owner
OwnershipCycleInterval time.Duration `toml:"ownership-cycle-interval" json:"ownership-cycle-interval"`
}

func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
Expand All @@ -49,14 +57,22 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
"If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.")
f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit,
"The maximum lag could be tolerated for the checkpoint lag.")

// used for chaos testing
f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we hide the parameter?

f.MarkHidden(flagOwnershipCycleInterval)

"The interval that the owner will retire itself")

// mark hidden
_ = f.MarkHidden(flagOwnershipCycleInterval)
}

func Default() Config {
return Config{
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
CheckPointLagLimit: DefaultCheckPointLagLimit,
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
CheckPointLagLimit: DefaultCheckPointLagLimit,
OwnershipCycleInterval: DefaultOwnershipCycleInterval,
}
}

Expand All @@ -78,6 +94,10 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error {
if err != nil {
return err
}
conf.OwnershipCycleInterval, err = f.GetDuration(flagOwnershipCycleInterval)
if err != nil {
return err
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,11 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) {
}
return loop, nil
}

func (od *OwnerDaemon) ForceToBeOwner(ctx context.Context) error {
return od.manager.ForceToBeOwner(ctx)
}

func (od *OwnerDaemon) RetireIfOwner() {
od.manager.RetireOwner()
}
1 change: 0 additions & 1 deletion br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func TestDaemon(t *testing.T) {
ow.RetireOwner()
req.False(ow.IsOwner())
app.AssertNotRunning(1 * time.Second)
ow.CampaignOwner()
Copy link
Contributor Author

@Tristan1900 Tristan1900 Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't need to campaign again to become leader since only one node

req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,8 @@ func RunStreamResume(
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
log.Info("starting", zap.String("cmd", cmdName))

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
Expand All @@ -941,10 +943,46 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
if err != nil {
return err
}
if cfg.AdvancerCfg.OwnershipCycleInterval > 0 {
err = advancerd.ForceToBeOwner(ctx)
if err != nil {
return err
}
log.Info("command line advancer forced to be the owner")
go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true)
}
loop()
return nil
}

// runOwnershipCycle handles the periodic cycling of ownership for the advancer
func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) {
ticker := time.NewTicker(cycleDuration)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !isOwner {
// try to become owner
if err := advancerd.ForceToBeOwner(ctx); err != nil {
log.Error("command line advancer failed to force ownership", zap.Error(err))
continue
}
log.Info("command line advancer forced to be the owner")
isOwner = true
} else {
// retire from being owner
advancerd.RetireIfOwner()
log.Info("command line advancer retired from being owner")
isOwner = false
}
}
}
}

func checkConfigForStatus(pd []string) error {
if len(pd) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
Expand Down
1 change: 0 additions & 1 deletion pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/config",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/streamhelper/config",
"//pkg/parser/terror",
"//pkg/util/logutil",
"//pkg/util/tiflashcompute",
Expand Down
8 changes: 0 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
Expand Down Expand Up @@ -458,13 +457,6 @@ func (b *AtomicBool) UnmarshalText(text []byte) error {
return nil
}

// LogBackup is the config for log backup service.
// For now, it includes the embed advancer.
type LogBackup struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused

Advancer logbackupconf.Config `toml:"advancer" json:"advancer"`
Enabled bool `toml:"enabled" json:"enabled"`
}

// Log is the log section of config.
type Log struct {
// Log level.
Expand Down
Loading