Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamically adjusting the max write speed of reorganization job #57611

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -793,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
Expand All @@ -817,23 +818,30 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
if maxWriteSpeed != bcCtx.GetLocalBackend().GetLimiterSpeed() {
bcCtx.GetLocalBackend().UpdateLimiter(maxWriteSpeed)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("max write speed", maxWriteSpeed))
}

concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt {
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
Expand All @@ -842,7 +850,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job
// Adjust worker pool size dynamically.
if job != nil {
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a problem in old code: We'd better wait the goroutine exited before leave this function, avoid the goroutine accesses something and causes data race with caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update in 50c4334, PTAL

adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
adjustWorkerPoolSize(ctx, pipe, job, bcCtx, avgRowSize)
}()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())),
job.RealStartTS,
)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe, nil, 0)
return executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,15 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]")
// valid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running")

// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.SetBatchSize(variable.TidbOptInt(sv, 0))
}
m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load()))
}
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
Expand Down Expand Up @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand All @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func genConfig(
unique bool,
resourceGroup string,
concurrency int,
maxWriteSpeed int,
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
Expand All @@ -68,7 +69,7 @@ func genConfig(
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
StoreWriteBWLimit: maxWriteSpeed,
}
// Each backend will build a single dir in lightning dir.
if ImporterRangeConcurrencyForTest != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ func (w *worker) runOneJobStep(
if latestJob.IsAlterable() {
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())))
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())))
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/executor/operate_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -210,6 +211,16 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
}
job.AdminOperator = byWho
case core.AlterDDLJobMaxWriteSpeed:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
speed, err := units.RAMInBytes(cons.Value.GetString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err != nil {
return errors.Trace(err)
}
job.ReorgMeta.SetMaxWriteSpeed(int(speed))
}
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/show_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string {
if job.MayNeedReorg() {
concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
maxWriteSpeed := m.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
if concurrency != variable.DefTiDBDDLReorgWorkerCount {
labels = append(labels, fmt.Sprintf("thread=%d", concurrency))
}
if batchSize != variable.DefTiDBDDLReorgBatchSize {
labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize))
}
if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed {
labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed))
}
if m.TargetScope != "" {
labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope))
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/show_ddl_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ func TestShowCommentsFromJob(t *testing.T) {
UseCloudStorage: true,
Concurrency: 8,
BatchSize: 1024,
MaxWriteSpeed: 1024 * 1024,
}
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed,
}
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud", res)
Expand All @@ -90,6 +92,7 @@ func TestShowCommentsFromJob(t *testing.T) {
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed,
TargetScope: "background",
}
res = showCommentsFromJob(job)
Expand Down
7 changes: 1 addition & 6 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,7 @@ func NewBackend(
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
} else {
writeLimiter = noopStoreWriteLimiter{}
}
writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit)
local := &Backend{
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
Expand Down
Loading