From 81c22b1c1b2041e2806160d8c7e1105a70815ff5 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 26 Aug 2021 16:30:06 +0800 Subject: [PATCH] owner: fix gc safepoint larger by one (#2647) --- cdc/owner/gc_manager.go | 13 ++++++++----- cdc/owner/gc_manager_test.go | 2 +- tests/gc_safepoint/run.sh | 9 +++++++++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/cdc/owner/gc_manager.go b/cdc/owner/gc_manager.go index 471e767a108..9887bcb4874 100644 --- a/cdc/owner/gc_manager.go +++ b/cdc/owner/gc_manager.go @@ -81,9 +81,11 @@ func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.Globa default: continue } - checkpointTs := cfState.Info.GetCheckpointTs(cfState.Status) - if minCheckpointTs > checkpointTs { - minCheckpointTs = checkpointTs + // When the changefeed starts up, CDC will do a snapshot read at (checkpoint-ts - 1) from TiKV, + // so (checkpoint - 1) should be an upper bound for the GC safepoint. + gcSafepointUpperBound := cfState.Info.GetCheckpointTs(cfState.Status) - 1 + if minCheckpointTs > gcSafepointUpperBound { + minCheckpointTs = gcSafepointUpperBound } } m.lastUpdatedTime = time.Now() @@ -129,17 +131,18 @@ func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time, } func (m *gcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error { + gcSafepointUpperBound := checkpointTs - 1 if m.isTiCDCBlockGC { pdTime, err := m.currentTimeFromPDCached(ctx) if err != nil { return errors.Trace(err) } - if pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second { + if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second { return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, ctx.ChangefeedVars().ID) } } else { // if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts. - if checkpointTs < m.lastSafePointTs { + if gcSafepointUpperBound < m.lastSafePointTs { return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs) } } diff --git a/cdc/owner/gc_manager_test.go b/cdc/owner/gc_manager_test.go index 07386a84746..623731fe2ff 100644 --- a/cdc/owner/gc_manager_test.go +++ b/cdc/owner/gc_manager_test.go @@ -117,7 +117,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { c.Assert(serviceID, check.Equals, cdcServiceSafePointID) c.Assert(ttl, check.Equals, gcManager.gcTTL) - c.Assert(safePoint, check.Equals, uint64(20)) + c.Assert(safePoint, check.Equals, uint64(19)) return 0, nil } err = gcManager.updateGCSafePoint(ctx, state) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index c49c060d783..3b7c3cf6c6c 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -16,6 +16,12 @@ function get_safepoint() { echo $safe_point } +function clear_gc_worker_safepoint() { + pd_addr=$1 + pd_cluster_id=$2 + ETCDCTL_API=3 etcdctl --endpoints=$pd_addr del /pd/$pd_cluster_id/gc/safe_point/service/ticdc +} + function check_safepoint_cleared() { pd_addr=$1 pd_cluster_id=$2 @@ -67,6 +73,7 @@ export -f check_safepoint_forward export -f check_safepoint_cleared export -f check_safepoint_equal export -f check_changefeed_state +export -f clear_gc_worker_safepoint function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -86,6 +93,8 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + clear_gc_worker_safepoint $pd_addr $pd_cluster_id + run_sql "CREATE DATABASE gc_safepoint;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE table gc_safepoint.simple(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO gc_safepoint.simple VALUES (),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT}