Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition: make ExchangePartition follow check constraints during writeOnly state(Part2) #46030

Merged
merged 20 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,10 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
// Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it.
if len(multiInfos) > 0 {
diff.AffectedOpts[0].OldSchemaID = ptSchemaID
}
} else {
// Swap
diff.TableID = ptDefID
Expand Down
18 changes: 15 additions & 3 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2458,16 +2458,27 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
}
var ptInfo []schemaIDAndTableInfo
if len(nt.Constraints) > 0 {
pt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionTableID: nt.ID,
ExchangePartitionDefID: defID,
}
ptInfo = append(ptInfo, schemaIDAndTableInfo{
schemaID: ptSchemaID,
tblInfo: pt,
})
}
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
ExchangePartitionTableID: ptID,
ExchangePartitionDefID: defID,
}
// We need an interim schema version,
// so there are no non-matching rows inserted
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, ptInfo...)
}
// From now on, nt (the non-partitioned table) has
// ExchangePartitionInfo set, meaning it is restricted
Expand Down Expand Up @@ -2527,6 +2538,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
}

// exchange table meta id
pt.ExchangePartitionInfo = nil
partDef.ID, nt.ID = nt.ID, partDef.ID

err = t.UpdateTable(ptSchemaID, pt)
Expand Down
28 changes: 26 additions & 2 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,35 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool {

// rollbackExchangeTablePartition will clear the non-partitioned
// table's ExchangePartitionInfo state.
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) {
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (ver int64, err error) {
tblInfo.ExchangePartitionInfo = nil
job.State = model.JobStateRollbackDone
job.SchemaState = model.StatePublic
return updateVersionAndTableInfo(d, t, job, tblInfo, true)
if len(tblInfo.Constraints) == 0 {
return updateVersionAndTableInfo(d, t, job, tblInfo, true)
}
var (
defID int64
ptSchemaID int64
ptID int64
partName string
withValidation bool
)
if err = job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
return ver, errors.Trace(err)
}
pt, err := getTableInfo(t, ptID, ptSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
pt.ExchangePartitionInfo = nil
var ptInfo []schemaIDAndTableInfo
ptInfo = append(ptInfo, schemaIDAndTableInfo{
schemaID: ptSchemaID,
tblInfo: pt,
})
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true, ptInfo...)
return ver, errors.Trace(err)
}

func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
Expand Down
24 changes: 5 additions & 19 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -690,28 +689,15 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
}
}
}
tbl := e.Table.Meta()

// Handle exchange partition
if tbl.ExchangePartitionInfo != nil {
is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return nil, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
e.Ctx(),
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
tbl := e.Table.Meta()
if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil {
if err := checkRowForExchangePartition(e.Ctx(), row, tbl); err != nil {
return nil, err
}
}

mjonss marked this conversation as resolved.
Show resolved Hide resolved
sc := e.Ctx().GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for i, gCol := range gCols {
Expand Down
60 changes: 43 additions & 17 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -79,23 +81,8 @@ func updateRecord(

// Handle exchange partition
tbl := t.Meta()
if tbl.ExchangePartitionInfo != nil {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return false, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return false, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
newData,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil {
if err := checkRowForExchangePartition(sctx, newData, tbl); err != nil {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
return false, err
}
}
Expand Down Expand Up @@ -326,3 +313,42 @@ func resetErrDataTooLong(colName string, rowIdx int, _ error) error {
newErr := types.ErrDataTooLong.GenWithStack("Data too long for column '%v' at row %v", colName, rowIdx)
return newErr
}

// checkRowForExchangePartition is only used for ExchangePartition by non-partitionTable during write only state.
// It check if rowData inserted or updated violate partition definition or checkConstraints of partitionTable.
func checkRowForExchangePartition(sctx sessionctx.Context, row []types.Datum, tbl *model.TableInfo) error {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionTableID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
tbl.ID,
)
if err != nil {
return err
}
if variable.EnableCheckConstraint.Load() {
type CheckConstraintTable interface {
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}
cc, ok := pt.(CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
return nil
}
26 changes: 24 additions & 2 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,31 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff)
}

func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// The partitioned table is not affected until the last stage
// It is not in StatePublic.
if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID {
return b.applyTableUpdate(m, diff)
ntIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
return nil, errors.Trace(err)
}
if diff.AffectedOpts == nil || diff.AffectedOpts[0].OldSchemaID == 0 {
return ntIDs, err
}
// Reload parition tabe.
ptSchemaID := diff.AffectedOpts[0].OldSchemaID
ptID := diff.AffectedOpts[0].TableID
ptDiff := &model.SchemaDiff{
Type: diff.Type,
Version: diff.Version,
TableID: ptID,
SchemaID: ptSchemaID,
OldTableID: ptID,
OldSchemaID: ptSchemaID,
}
ptIDs, err := b.applyTableUpdate(m, ptDiff)
if err != nil {
return nil, errors.Trace(err)
}
return append(ptIDs, ntIDs...), nil
}
ntSchemaID := diff.OldSchemaID
ntID := diff.OldTableID
Expand Down
5 changes: 3 additions & 2 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,9 @@ func (p PartitionType) String() string {

// ExchangePartitionInfo provides exchange partition info.
type ExchangePartitionInfo struct {
ExchangePartitionID int64 `json:"exchange_partition_id"`
ExchangePartitionDefID int64 `json:"exchange_partition_def_id"`
// It is nt tableID when table which has the info is a partition table, else pt tableID.
ExchangePartitionTableID int64 `json:"exchange_partition_id"`
ExchangePartitionDefID int64 `json:"exchange_partition_def_id"`
// Deprecated, not used
XXXExchangePartitionFlag bool `json:"exchange_partition_flag"`
}
Expand Down
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ type PartitionedTable interface {
GetAllPartitionIDs() []int64
GetPartitionColumnIDs() []int64
GetPartitionColumnNames() []model.CIStr
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error
}

// TableFromMeta builds a table.Table from *model.TableInfo.
Expand Down
54 changes: 52 additions & 2 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1268,12 +1269,12 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key {
return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle))
}

func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error {
func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error {
defID, err := t.locatePartition(ctx, r)
if err != nil {
return err
}
if defID != pid {
if defID != partID && defID != ntID {
return errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet)
}
return nil
Expand Down Expand Up @@ -1551,6 +1552,39 @@ func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context,
return t.partitions[pid], nil
}

// checkConstraintForExchangePartition is only used for ExchangePartition by partitionTable during write only state.
// It check if rowData inserted or updated violate checkConstraints of non-partitionTable.
func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Datum, partID, ntID int64) error {
type InfoSchema interface {
TableByID(id int64) (val table.Table, ok bool)
}
is, ok := sctx.GetDomainInfoSchema().(InfoSchema)
if !ok {
return errors.Errorf("exchange partition process assert inforSchema failed")
}
nt, tableFound := is.TableByID(ntID)
if !tableFound {
// Now partID is nt tableID.
nt, tableFound = is.TableByID(partID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
}
type CheckConstraintTable interface {
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}
cc, ok := nt.(CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
return nil
}

// AddRecord implements the AddRecord method for the table.Table interface.
func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
return partitionedTableAddRecord(ctx, t, r, nil, opts)
Expand All @@ -1570,6 +1604,14 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r []
if t.Meta().Partition.HasTruncatingPartitionID(pid) {
return nil, errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public"))
}
exchangePartitionInfo := t.Meta().ExchangePartitionInfo
if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == pid &&
variable.EnableCheckConstraint.Load() {
err = checkConstraintForExchangePartition(ctx, r, pid, exchangePartitionInfo.ExchangePartitionTableID)
if err != nil {
return nil, errors.WithStack(err)
}
}
tbl := t.GetPartition(pid)
recordID, err = tbl.AddRecord(ctx, r, opts...)
if err != nil {
Expand Down Expand Up @@ -1695,6 +1737,14 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context,
if t.Meta().Partition.HasTruncatingPartitionID(to) {
return errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public"))
}
exchangePartitionInfo := t.Meta().ExchangePartitionInfo
if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == to &&
variable.EnableCheckConstraint.Load() {
err = checkConstraintForExchangePartition(ctx, newData, to, exchangePartitionInfo.ExchangePartitionTableID)
if err != nil {
return errors.WithStack(err)
}
}

// The old and new data locate in different partitions.
// Remove record from old partition and add record to new partition.
Expand Down
Loading