Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add chaos testing for advancer owner #58183

Merged
merged 5 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool {
return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
// HasSubscriptions returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscriptions() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

Expand All @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint {
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
func newCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
Expand Down Expand Up @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed since not used

c.checkpoints = cps
}

func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
region, err := locateKeyOfRegion(ctx, c.env, startKey)
if err != nil {
Expand Down Expand Up @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
}

func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
cp := newCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func TestRemoveTaskAndFlush(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
return !adv.HasSubscriptions()
}, 10*time.Second, 100*time.Millisecond)
}

Expand Down
28 changes: 19 additions & 9 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ import (
const (
flagBackoffTime = "backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
Copy link
Contributor Author

@Tristan1900 Tristan1900 Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed since not used

flagAdvancingByCache = "advancing-by-cache"
flagTryAdvanceThreshold = "try-advance-threshold"
flagCheckPointLagLimit = "check-point-lag-limit"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
DefaultAdvanceByCache = true
// used for chaos testing
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
flagOwnerRetireInterval = "advance-owner-resign-interval"

DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second

// used for chaos testing, default to disable
DefaultAdvancerOwnerRetireInterval = 0
)

var (
Expand All @@ -38,6 +39,11 @@ type Config struct {
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
// The maximum lag could be tolerated for the checkpoint lag.
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`

// Following configs are used in chaos testings, better not to enable in prod
//
// used to periodically retire advancer owner for chaos testing
AdvancerOwnerRetireInterval time.Duration `toml:"advancer-owner-retire-interval" json:"advancer-owner-retire-interval"`
}

func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
Expand All @@ -49,6 +55,10 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
"If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.")
f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit,
"The maximum lag could be tolerated for the checkpoint lag.")

// used for chaos testing
f.Duration(flagOwnerRetireInterval, DefaultAdvancerOwnerRetireInterval,
"The interval that the owner will retire itself")
}

func Default() Config {
Expand Down
18 changes: 14 additions & 4 deletions br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ type OwnerDaemon struct {

// When not `nil`, implies the daemon is running.
cancel context.CancelFunc

// leader retire internal, used for chaos testing, suggest not to enable in prod
// default to 0 to disable
retireInterval time.Duration
ownerStartTime time.Time
}

// New creates a new owner daemon.
func New(daemon Interface, manager owner.Manager, tickInterval time.Duration) *OwnerDaemon {
func New(daemon Interface, manager owner.Manager, tickInterval time.Duration, retireInternal time.Duration) *OwnerDaemon {
return &OwnerDaemon{
daemon: daemon,
manager: manager,
tickInterval: tickInterval,
daemon: daemon,
manager: manager,
tickInterval: tickInterval,
retireInterval: retireInternal,
}
}

Expand All @@ -56,12 +62,16 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) {
log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
// Note: maybe save the context so we can cancel the tick when we are not owner?
od.daemon.OnBecomeOwner(cx)
od.ownerStartTime = time.Now()
}

// Tick anyway.
if err := od.daemon.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
if od.retireInterval != 0 && time.Now().Sub(od.ownerStartTime) > od.retireInterval {
od.manager.RetireOwner()
}
}

// Begin starts the daemon.
Expand Down
38 changes: 36 additions & 2 deletions br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestDaemon(t *testing.T) {
req := require.New(t)
app := newTestApp(t)
ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key")
d := daemon.New(app, ow, 100*time.Millisecond)
d := daemon.New(app, ow, 100*time.Millisecond, 0)

app.AssertService(req, false)
f, err := d.Begin(ctx)
Expand All @@ -149,10 +149,44 @@ func TestDaemon(t *testing.T) {
ow.RetireOwner()
req.False(ow.IsOwner())
app.AssertNotRunning(1 * time.Second)
ow.CampaignOwner()
Copy link
Contributor Author

@Tristan1900 Tristan1900 Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't need to campaign again to become leader since only one node

req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)
app.AssertStart(1 * time.Second)
app.AssertTick(1 * time.Second)

// make sure chaos did not kick in so never retires
req.Neverf(func() bool {
return !ow.IsOwner()
}, 5*time.Second, 100*time.Millisecond, "should never retire")
}

func TestDaemonWithChaos(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := require.New(t)
app := newTestApp(t)
ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key")
d := daemon.New(app, ow, 100*time.Millisecond, 2*time.Second)

app.AssertService(req, false)
f, err := d.Begin(ctx)
req.NoError(err)
app.AssertService(req, true)
go f()

// wait for it to become owner
req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)

// wait for chaos test to kick in to auto retire
req.Eventually(func() bool {
return !ow.IsOwner()
}, 3*time.Second, 500*time.Millisecond)

// sanity check it will try to become leader in background again
req.Eventually(func() bool {
return ow.IsOwner()
}, 2*time.Second, 500*time.Millisecond)
}
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
defer func() {
ownerMgr.Close()
}()
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration, cfg.AdvancerCfg.AdvancerOwnerRetireInterval)
loop, err := advancerd.Begin(ctx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
}
adv := streamhelper.NewCheckpointAdvancer(env)
do.brOwnerMgr = streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient)
do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration)
do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration, adv.Config().AdvancerOwnerRetireInterval)
loop, err := do.logBackupAdvancer.Begin(ctx)
if err != nil {
return err
Expand Down