diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 95f344c2f3c9c..af57eeb6f7003 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -163,6 +163,7 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//errs", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/br/pkg/lightning/backend/local/checksum.go b/br/pkg/lightning/backend/local/checksum.go index f001712844200..05df4e32c92c7 100644 --- a/br/pkg/lightning/backend/local/checksum.go +++ b/br/pkg/lightning/backend/local/checksum.go @@ -328,12 +328,33 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo return nil, err } +var retryGetTSInterval = time.Second + // Checksum implements the ChecksumManager interface. func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") + var ( + physicalTS, logicalTS int64 + err error + retryTime int + ) + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + for err != nil { + if !pd.IsLeaderChange(errors.Cause(err)) { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + retryTime++ + if retryTime%60 == 0 { + log.FromContext(ctx).Warn("fetch tso from pd failed and retrying", + zap.Int("retryTime", retryTime), + zap.Error(err)) + } + select { + case <-ctx.Done(): + err = ctx.Err() + case <-time.After(retryGetTSInterval): + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + } } ts := oracle.ComposeTS(physicalTS, logicalTS) if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { diff --git a/br/pkg/lightning/backend/local/checksum_test.go b/br/pkg/lightning/backend/local/checksum_test.go index eae3f7909e3eb..3506995514928 100644 --- a/br/pkg/lightning/backend/local/checksum_test.go +++ b/br/pkg/lightning/backend/local/checksum_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/errs" "go.uber.org/atomic" ) @@ -198,6 +199,18 @@ func TestDoChecksumWithTikv(t *testing.T) { require.Zero(t, checksumExec.manager.currentTS) require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS)) } + + // test PD leader change error + backup := retryGetTSInterval + retryGetTSInterval = time.Millisecond + t.Cleanup(func() { + retryGetTSInterval = backup + }) + pdClient.leaderChanging = true + kvClient.maxErrCount = 0 + checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} + _, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + require.NoError(t, err) } func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { @@ -266,6 +279,7 @@ type testPDClient struct { count atomic.Int32 gcSafePoint []safePointTTL logicalTSCounter atomic.Uint64 + leaderChanging bool } func (c *testPDClient) currentSafePoint() uint64 { @@ -282,6 +296,9 @@ func (c *testPDClient) currentSafePoint() uint64 { func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { physicalTS := time.Now().UnixMilli() + if c.leaderChanging && physicalTS%2 == 0 { + return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed) + } logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) return physicalTS, logicalTS, nil }