diff --git a/DEPS.bzl b/DEPS.bzl index 26173b19a6e79..bfcecb16f2d04 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3061,8 +3061,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I=", - version = "v0.0.0-20230524051921-3dc79e773139", + sum = "h1:dLoYgMFgzUaS6fAAPdjA7oGDM0LdCIm+qhgb3PzrDps=", + version = "v0.0.0-20230726063044-73d6d7f3756b", ) go_repository( name = "com_github_pingcap_log", @@ -3710,8 +3710,8 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sum = "h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=", - version = "v0.0.0-20221031025758-80f0d8ca4d07", + sum = "h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM=", + version = "v0.0.0-20230904040343-947701a32c05", ) go_repository( name = "com_github_timakin_bodyclose", diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index fd33c7c77d91e..3c2be48b86e19 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -167,6 +167,7 @@ go_test( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//errs", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//buffer", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index b30fe14e01fc1..96fa7df007fc4 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -310,11 +310,32 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo return nil, err } +var retryGetTSInterval = time.Second + func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") + var ( + physicalTS, logicalTS int64 + err error + retryTime int + ) + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + for err != nil { + if !pd.IsLeaderChange(errors.Cause(err)) { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + retryTime++ + if retryTime%60 == 0 { + log.FromContext(ctx).Warn("fetch tso from pd failed and retrying", + zap.Int("retryTime", retryTime), + zap.Error(err)) + } + select { + case <-ctx.Done(): + err = ctx.Err() + case <-time.After(retryGetTSInterval): + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + } } ts := oracle.ComposeTS(physicalTS, logicalTS) if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index 20acc23fe6be0..a981d33a9973f 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/errs" "go.uber.org/atomic" ) @@ -198,6 +199,18 @@ func TestDoChecksumWithTikv(t *testing.T) { require.Zero(t, checksumExec.manager.currentTS) require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS)) } + + // test PD leader change error + backup := retryGetTSInterval + retryGetTSInterval = time.Millisecond + t.Cleanup(func() { + retryGetTSInterval = backup + }) + pdClient.leaderChanging = true + kvClient.maxErrCount = 0 + checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} + _, err = checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + require.NoError(t, err) } func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { @@ -233,6 +246,7 @@ type testPDClient struct { count atomic.Int32 gcSafePoint []safePointTTL logicalTSCounter atomic.Uint64 + leaderChanging bool } func (c *testPDClient) currentSafePoint() uint64 { @@ -250,6 +264,9 @@ func (c *testPDClient) currentSafePoint() uint64 { func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { physicalTS := time.Now().UnixMilli() logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) + if c.leaderChanging && physicalTS%2 == 0 { + return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed) + } return physicalTS, logicalTS, nil } diff --git a/go.mod b/go.mod index 016ad40f8f632..525d38ea4075d 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 + github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -91,7 +91,7 @@ require ( github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d - github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 + github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index bf634c986981d..b34df292fc97c 100644 --- a/go.sum +++ b/go.sum @@ -780,13 +780,12 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I= -github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b h1:dLoYgMFgzUaS6fAAPdjA7oGDM0LdCIm+qhgb3PzrDps= +github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= -github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= @@ -811,6 +810,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU= github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= @@ -937,8 +937,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d h1:xL9X0pr5wZn8onnW6i98sf7KjaZmk0OIQnDhW1z+hRQ= github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= -github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= -github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= +github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM= +github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= @@ -1305,6 +1305,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=