From da4a195a4855cb447f4f7020abbbc30e3b35b6ec Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 18 Aug 2023 14:38:01 +0800 Subject: [PATCH] sink(ticdc): add limit to storage ddl sink (#9601) ref pingcap/tiflow#9602 --- .../cloudstorage/cloud_storage_ddl_sink.go | 20 ++++++++++++++++--- .../cloud_storage_ddl_sink_test.go | 2 ++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 69f8063f986..0d8f24bd5d7 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "net/url" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -41,6 +42,8 @@ type ddlSink struct { // statistic is used to record the DDL metrics statistics *metrics.Statistics storage storage.ExternalStorage + + lastSendCheckpointTsTime time.Time } // NewCloudStorageDDLSink creates a ddl sink for cloud storage. @@ -52,9 +55,10 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er changefeedID := contextutil.ChangefeedIDFromCtx(ctx) d := &ddlSink{ - id: changefeedID, - storage: storage, - statistics: metrics.NewStatistics(ctx, sink.TxnSink), + id: changefeedID, + storage: storage, + statistics: metrics.NewStatistics(ctx, sink.TxnSink), + lastSendCheckpointTsTime: time.Now(), } return d, nil @@ -102,6 +106,16 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error func (d *ddlSink) WriteCheckpointTs(ctx context.Context, ts uint64, tables []*model.TableInfo, ) error { + if time.Since(d.lastSendCheckpointTsTime) < 2*time.Second { + log.Debug("skip write checkpoint ts to external storage", + zap.Any("changefeedID", d.id), + zap.Uint64("ts", ts)) + return nil + } + + defer func() { + d.lastSendCheckpointTsTime = time.Now() + }() ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts}) if err != nil { return errors.Trace(err) diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index e8703c2c674..23c10e83bea 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -20,6 +20,7 @@ import ( "os" "path" "testing" + "time" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -124,6 +125,7 @@ func TestWriteCheckpointTs(t *testing.T) { }, } + time.Sleep(3 * time.Second) err = sink.WriteCheckpointTs(ctx, 100, tables) require.Nil(t, err) metadata, err := os.ReadFile(path.Join(parentDir, "metadata"))