Skip to content

Commit

Permalink
gc(ticdc): do not block gc if there is no changefeed (#9634) (#9635)
Browse files Browse the repository at this point in the history
close #9633
  • Loading branch information
ti-chi-bot authored Aug 23, 2023
1 parent c0ac9fa commit 637d6c2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
9 changes: 9 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -800,6 +801,14 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
forceUpdateMap[upstreamID] = nil
}
}
// check if the upstream has a changefeed, if not we should update the gc safepoint
_ = o.upstreamManager.Visit(func(up *upstream.Upstream) error {
if _, exist := minCheckpointTsMap[up.ID]; !exist {
ts := up.PDClock.CurrentTime()
minCheckpointTsMap[up.ID] = oracle.GoTimeToTS(ts)
}
return nil
})
return minCheckpointTsMap, forceUpdateMap
}

Expand Down
26 changes: 15 additions & 11 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"net/url"
"testing"
Expand Down Expand Up @@ -440,21 +439,11 @@ func TestUpdateGCSafePoint(t *testing.T) {
mockPDClient.UpdateServiceGCSafePointFunc = func(
ctx context.Context, serviceID string, ttl int64, safePoint uint64,
) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(math.MaxUint64-1))
return 0, nil
}
err := o.updateGCSafepoint(ctx, state)
require.Nil(t, err)

// add a failed changefeed, it must not trigger update GC safepoint.
mockPDClient.UpdateServiceGCSafePointFunc = func(
ctx context.Context, serviceID string, ttl int64, safePoint uint64,
) (uint64, error) {
t.Fatal("must not update")
return 0, nil
}
changefeedID1 := model.DefaultChangeFeedID("test-changefeed1")
tester.MustUpdate(
fmt.Sprintf("%s/changefeed/info/%s",
Expand Down Expand Up @@ -685,6 +674,7 @@ func TestCalculateGCSafepointTs(t *testing.T) {
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
o.upstreamManager = upstream.NewManager4Test(nil)

stateMap := []model.FeedState{
model.StateNormal, model.StateStopped, model.StatePending,
Expand Down Expand Up @@ -749,6 +739,20 @@ func TestCalculateGCSafepointTs(t *testing.T) {
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

func TestCalculateGCSafepointTsNoChangefeed(t *testing.T) {
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
o.upstreamManager = upstream.NewManager4Test(nil)
up, err := o.upstreamManager.GetDefaultUpstream()
require.Nil(t, err)
up.PDClock = pdutil.NewClock4Test()

minCheckpoinTsMap, forceUpdateMap := o.calculateGCSafepoint(state)
require.Equal(t, 1, len(minCheckpoinTsMap))
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

// AsyncStop should cleanup jobs and reject.
func TestAsyncStop(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 637d6c2

Please sign in to comment.