Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl notifier: use pagination for SELECT to reduce memory usage #58376

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
Expand All @@ -30,11 +31,12 @@ go_test(
timeout = "short",
srcs = [
"events_test.go",
"store_test.go",
"testkit_test.go",
],
embed = [":notifier"],
flaky = True,
shard_count = 7,
shard_count = 9,
deps = [
"//pkg/ddl",
"//pkg/ddl/session",
Expand All @@ -43,9 +45,12 @@ go_test(
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
],
)
30 changes: 15 additions & 15 deletions pkg/ddl/notifier/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ import (

// PubSchemeChangeToStore publishes schema changes to the store to notify
// subscribers on the Store. It stages changes in given `se` so they will be
// visible when `se` further commits. When the schema change is not from
// multi-schema change DDL, `multiSchemaChangeSeq` is -1. Otherwise,
// `multiSchemaChangeSeq` is the sub-job index of the multi-schema change DDL.
// visible when `se` further commits. When the DDL contains only one schema
// change, `subJobID` is -1. Otherwise, `subJobID` is the sub-job index of the
// DDL, like multi-schema change or batched create table.
func PubSchemeChangeToStore(
ctx context.Context,
se *sess.Session,
ddlJobID int64,
multiSchemaChangeSeq int64,
subJobID int64,
event *SchemaChangeEvent,
store Store,
) error {
change := &schemaChange{
ddlJobID: ddlJobID,
multiSchemaChangeSeq: multiSchemaChangeSeq,
event: event,
change := &SchemaChange{
ddlJobID: ddlJobID,
subJobID: subJobID,
event: event,
}
return store.Insert(ctx, se, change)
}

// schemaChange is the Golang representation of the persistent data. (ddlJobID,
// multiSchemaChangeSeq) should be unique in the cluster.
type schemaChange struct {
ddlJobID int64
multiSchemaChangeSeq int64
event *SchemaChangeEvent
processedByFlag uint64
// SchemaChange is the Golang representation of the persistent data. (ddlJobID,
// subJobID) should be unique in the cluster.
type SchemaChange struct {
ddlJobID int64
subJobID int64
event *SchemaChangeEvent
processedByFlag uint64
}
125 changes: 98 additions & 27 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (

"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/util/chunk"
)

// CloseFn is the function to release the resource.
type CloseFn func()

// Store is the (de)serialization and persistent layer.
type Store interface {
Insert(context.Context, *sess.Session, *schemaChange) error
Insert(context.Context, *sess.Session, *SchemaChange) error
UpdateProcessed(
ctx context.Context,
se *sess.Session,
Expand All @@ -34,15 +38,29 @@ type Store interface {
processedBy uint64,
) error
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
List(ctx context.Context, se *sess.Session) ([]*schemaChange, error)
// List will start a transaction of given session and read all schema changes
// through a ListResult. The ownership of session is occupied by Store until
// CloseFn is called.
List(ctx context.Context, se *sess.Session) (ListResult, CloseFn)
}

// ListResult is the result stream of a List operation.
type ListResult interface {
// Read tries to decode at most `len(changes)` SchemaChange into given slices. It
// returns the number of schemaChanges decoded, 0 means no more schemaChanges.
//
// Note that the previous SchemaChange in the slice will be overwritten when call
// Read.
Read(changes []*SchemaChange) (int, error)
}

type tableStore struct {
db string
table string
}

func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schemaChange) error {
// Insert implements Store interface.
func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *SchemaChange) error {
event, err := json.Marshal(change.event)
if err != nil {
return errors.Trace(err)
Expand All @@ -58,11 +76,12 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema
)
_, err = s.Execute(
ctx, sql, "ddl_notifier",
change.ddlJobID, change.multiSchemaChangeSeq, event,
change.ddlJobID, change.subJobID, event,
)
return err
}

// UpdateProcessed implements Store interface.
func (t *tableStore) UpdateProcessed(
ctx context.Context,
se *sess.Session,
Expand All @@ -81,6 +100,7 @@ func (t *tableStore) UpdateProcessed(
return err
}

// DeleteAndCommit implements Store interface.
func (t *tableStore) DeleteAndCommit(
ctx context.Context,
se *sess.Session,
Expand All @@ -106,34 +126,85 @@ func (t *tableStore) DeleteAndCommit(
return errors.Trace(err)
}

func (t *tableStore) List(ctx context.Context, se *sess.Session) ([]*schemaChange, error) {
sql := fmt.Sprintf(`
SELECT
ddl_job_id,
sub_job_id,
schema_change,
processed_by_flag
FROM %s.%s ORDER BY ddl_job_id, sub_job_id`,
t.db, t.table)
rows, err := se.Execute(ctx, sql, "ddl_notifier")
// List implements Store interface.
func (t *tableStore) List(ctx context.Context, se *sess.Session) (ListResult, CloseFn) {
return &listResult{
ctx: ctx,
se: se,
sqlTemplate: fmt.Sprintf(`
SELECT
ddl_job_id,
sub_job_id,
schema_change,
processed_by_flag
FROM %s.%s
WHERE (ddl_job_id, sub_job_id) > (%%?, %%?)
ORDER BY ddl_job_id, sub_job_id
LIMIT %%?`,
t.db, t.table),
// DDL job ID are always positive, so we can use 0 as the initial value.
maxReturnedDDLJobID: 0,
maxReturnedSubJobID: 0,
}, se.Rollback
}

type listResult struct {
ctx context.Context
se *sess.Session
sqlTemplate string
maxReturnedDDLJobID int64
maxReturnedSubJobID int64
}

// Read implements ListResult interface.
func (r *listResult) Read(changes []*SchemaChange) (int, error) {
if r.maxReturnedDDLJobID == 0 && r.maxReturnedSubJobID == 0 {
err := r.se.Begin(r.ctx)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, errors.Trace(err)
}
}

rows, err := r.se.Execute(
r.ctx, r.sqlTemplate, "ddl_notifier",
r.maxReturnedDDLJobID, r.maxReturnedSubJobID, len(changes),
)
if err != nil {
return nil, err
return 0, errors.Trace(err)
}
ret := make([]*schemaChange, 0, len(rows))
for _, row := range rows {
event := SchemaChangeEvent{}
err = json.Unmarshal(row.GetBytes(2), &event)

if err = r.unmarshalSchemaChanges(rows, changes); err != nil {
return 0, errors.Trace(err)
}
return len(rows), nil
}

func (r *listResult) unmarshalSchemaChanges(rows []chunk.Row, changes []*SchemaChange) error {
for i, row := range rows {
if changes[i] == nil {
changes[i] = new(SchemaChange)
}
if changes[i].event == nil {
changes[i].event = new(SchemaChangeEvent)
}
if changes[i].event.inner == nil {
changes[i].event.inner = new(jsonSchemaChangeEvent)
}
Comment on lines +184 to +192
Copy link
Member

@Rustin170506 Rustin170506 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

// Add constructor function
func NewSchemaChange() *SchemaChange {
    return &SchemaChange{
        event: &SchemaChangeEvent{
            inner: &jsonSchemaChangeEvent{},
        },
    }
}


if changes[i] == nil {
    changes[i] = NewSchemaChange()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean there is a case that changes[i] is not nil, but the event or the inner is nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to improve robustness, I don't want to assume caller uses some kind of SchemaChange.

I'll test the OOM problem soon


err := json.Unmarshal(row.GetBytes(2), changes[i].event.inner)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
changes[i].ddlJobID = row.GetInt64(0)
changes[i].subJobID = row.GetInt64(1)
changes[i].processedByFlag = row.GetUint64(3)

if i == len(rows)-1 {
r.maxReturnedDDLJobID = changes[i].ddlJobID
r.maxReturnedSubJobID = changes[i].subJobID
}
ret = append(ret, &schemaChange{
ddlJobID: row.GetInt64(0),
multiSchemaChangeSeq: row.GetInt64(1),
event: &event,
processedByFlag: row.GetUint64(3),
})
}
return ret, nil
return nil
}

// OpenTableStore opens a store on a created table `db`.`table`. The table should
Expand Down
80 changes: 80 additions & 0 deletions pkg/ddl/notifier/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notifier

import (
"encoding/json"
"testing"

"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)

func TestLeftoverWhenUnmarshal(t *testing.T) {
r := &listResult{}
changesReused := []*SchemaChange{
{event: &SchemaChangeEvent{inner: &jsonSchemaChangeEvent{
TableInfo: &model.TableInfo{
Name: pmodel.NewCIStr("old"),
Columns: []*model.ColumnInfo{{Name: pmodel.NewCIStr("c1")}},
Indices: []*model.IndexInfo{
{Name: pmodel.NewCIStr("i1")},
{Name: pmodel.NewCIStr("i2")},
{Name: pmodel.NewCIStr("i3")},
},
},
}}},
{event: &SchemaChangeEvent{inner: &jsonSchemaChangeEvent{
AddedPartInfo: &model.PartitionInfo{Expr: "test"},
}}},
nil,
}

newTableInfo := &model.TableInfo{
Name: pmodel.NewCIStr("new"),
Columns: []*model.ColumnInfo{
{Name: pmodel.NewCIStr("c2")},
{Name: pmodel.NewCIStr("c3")},
},
Indices: []*model.IndexInfo{
{Name: pmodel.NewCIStr("i4")},
},
Constraints: []*model.ConstraintInfo{
{Name: pmodel.NewCIStr("c1")},
},
}

newTableInfoJSON, err := json.Marshal(jsonSchemaChangeEvent{TableInfo: newTableInfo})
require.NoError(t, err)
sameRow := chunk.MutRowFromDatums([]types.Datum{
types.NewIntDatum(1), types.NewIntDatum(1),
types.NewBytesDatum(newTableInfoJSON), types.NewUintDatum(0),
}).ToRow()
rows := []chunk.Row{sameRow, sameRow, sameRow}

err = r.unmarshalSchemaChanges(rows, changesReused)
require.NoError(t, err)

require.Equal(t, newTableInfo, changesReused[0].event.inner.TableInfo)
require.Equal(t, newTableInfo, changesReused[1].event.inner.TableInfo)
// The leftover will not be cleaned right after unmarshal. It will be cleaned be
// GC later. Because we use type field to determine read which field, so the
// leftover will not affect the result.
require.NotNil(t, changesReused[1].event.inner.AddedPartInfo)
require.Equal(t, newTableInfo, changesReused[2].event.inner.TableInfo)
}
Loading