Skip to content

Commit

Permalink
ddl:set the low priority for the adding the index (#5976)
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll authored and winkyao committed Mar 9, 2018
1 parent a3bf058 commit bf12771
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
5 changes: 4 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
if err != nil {
// Update the reorg handle that has been processed.
err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {

return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
})
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, batch %d, take time %v, update handle err %v",
Expand Down Expand Up @@ -655,6 +656,7 @@ func (w *worker) doBackfillIndexTask(t table.Table, colMap map[int64]*types.Fiel
startTime := time.Now()
var ret *taskResult
err := kv.RunInNewTxn(w.ctx.GetStore(), true, func(txn kv.Transaction) error {
txn.SetOption(kv.Priority, kv.PriorityLow)
ret = w.doBackfillIndexTaskInTxn(t, txn, colMap)
return errors.Trace(ret.err)
})
Expand Down Expand Up @@ -717,11 +719,12 @@ type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, e

func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
if err != nil {
return errors.Trace(err)
}

firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
if err != nil {
Expand Down
55 changes: 53 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,7 @@ const (
checkRequestPriority = 1
checkRequestNotFillCache = 2
checkRequestSyncLog = 3
checkDDLAddIndexPriority = 4
)

type checkRequestClient struct {
Expand All @@ -2008,8 +2009,9 @@ type checkRequestClient struct {
notFillCache bool
mu struct {
sync.RWMutex
checkFlags uint32
syncLog bool
checkFlags uint32
lowPriorityCnt uint32
syncLog bool
}
}

Expand Down Expand Up @@ -2039,6 +2041,16 @@ func (c *checkRequestClient) SendReq(ctx context.Context, addr string, req *tikv
return nil, errors.New("fail to set sync log")
}
}
} else if checkFlags == checkDDLAddIndexPriority {
if req.Type == tikvrpc.CmdScan {
if c.priority != req.Priority {
return nil, errors.New("fail to set priority")
}
} else if req.Type == tikvrpc.CmdPrewrite {
if c.priority == pb.CommandPri_Low {
c.mu.lowPriorityCnt++
}
}
}
return resp, err
}
Expand Down Expand Up @@ -2071,6 +2083,45 @@ func (s *testContextOptionSuite) TearDownSuite(c *C) {
s.store.Close()
}

func (s *testContextOptionSuite) TestAddIndexPriority(c *C) {
cli := &checkRequestClient{}
hijackClient := func(c tikv.Client) tikv.Client {
cli.Client = c
return cli
}

store, err := mockstore.NewMockTikvStore(
mockstore.WithHijackClient(hijackClient),
)
c.Assert(err, IsNil)
dom, err := tidb.BootstrapSession(store)
c.Assert(err, IsNil)

tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int, v int)")

// Insert some data to make sure plan build IndexLookup for t1.
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values (%d, %d)", i, i))
}

cli.mu.Lock()
cli.mu.checkFlags = checkDDLAddIndexPriority
cli.mu.Unlock()

cli.priority = pb.CommandPri_Low
tk.MustExec("alter table t1 add index t1_index (id);")

c.Assert(cli.mu.lowPriorityCnt > 0, IsTrue)

cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()
dom.Close()
store.Close()
}

func (s *testContextOptionSuite) TestCoprocessorPriority(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ type Snapshot interface {
Retriever
// BatchGet gets a batch of values from snapshot.
BatchGet(keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
4 changes: 4 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (s *mockSnapshot) Get(k Key) ([]byte, error) {
return s.store.Get(k)
}

func (s *mockSnapshot) SetPriority(priority int) {

}

func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
m := make(map[string][]byte)
for _, k := range keys {
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
}
}

func (s *tikvSnapshot) SetPriority(priority int) {
s.priority = pb.CommandPri(priority)
}

// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
Expand Down

0 comments on commit bf12771

Please sign in to comment.