Skip to content

Commit

Permalink
sink(ticdc): add limit to storage ddl sink (#9601)
Browse files Browse the repository at this point in the history
ref #9602
  • Loading branch information
CharlesCheung96 authored Aug 18, 2023
1 parent 9c7ae61 commit da4a195
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
20 changes: 17 additions & 3 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -41,6 +42,8 @@ type ddlSink struct {
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage

lastSendCheckpointTsTime time.Time
}

// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
Expand All @@ -52,9 +55,10 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er

changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
d := &ddlSink{
id: changefeedID,
storage: storage,
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
id: changefeedID,
storage: storage,
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
lastSendCheckpointTsTime: time.Now(),
}

return d, nil
Expand Down Expand Up @@ -102,6 +106,16 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
if time.Since(d.lastSendCheckpointTsTime) < 2*time.Second {
log.Debug("skip write checkpoint ts to external storage",
zap.Any("changefeedID", d.id),
zap.Uint64("ts", ts))
return nil
}

defer func() {
d.lastSendCheckpointTsTime = time.Now()
}()
ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
return errors.Trace(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path"
"testing"
"time"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -124,6 +125,7 @@ func TestWriteCheckpointTs(t *testing.T) {
},
}

time.Sleep(3 * time.Second)
err = sink.WriteCheckpointTs(ctx, 100, tables)
require.Nil(t, err)
metadata, err := os.ReadFile(path.Join(parentDir, "metadata"))
Expand Down

0 comments on commit da4a195

Please sign in to comment.