Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runaway: only check statements with a non-empty plan digest (#57582) #57599

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/executor/internal/querywatch/query_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,22 @@ func TestQueryWatch(t *testing.T) {
time.Sleep(1 * time.Second)
tk.MustGetErrCode("select * from test.t1", mysql.ErrResourceGroupQueryRunawayQuarantine)
}

func TestQueryWatchIssue56897(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
require.Eventually(t, func() bool {
return dom.RunawayManager().IsSyncerInitialized()
}, 20*time.Second, 300*time.Millisecond)
tk.MustQuery("QUERY WATCH ADD ACTION KILL SQL TEXT SIMILAR TO 'use test';").Check((testkit.Rows("1")))
time.Sleep(1 * time.Second)
_, err := tk.Exec("use test")
require.Nil(t, err)
_, err = tk.Exec("use mysql")
require.Nil(t, err)
}
66 changes: 35 additions & 31 deletions pkg/resourcegroup/runaway/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type Checker struct {
// From the group runaway settings, which will be applied when a query lacks a specified watch rule.
settings *rmpb.RunawaySettings

// markedByRule is set to true when the query matches the group runaway settings.
markedByRule atomic.Bool
// markedByWatch is set to true when the query matches the specified watch rules.
markedByWatch bool
// markedByIdentifyInRunawaySettings is set to true when the query matches the group runaway settings.
markedByIdentifyInRunawaySettings atomic.Bool
// markedByQueryWatchRule is set to true when the query matches the specified watch rules.
markedByQueryWatchRule bool
// watchAction is the specified watch action for the runaway query.
// If it's not given, the action defined in `settings` will be used.
watchAction rmpb.RunawayAction
Expand All @@ -65,14 +65,14 @@ func NewChecker(
originalSQL, sqlDigest, planDigest string, startTime time.Time,
) *Checker {
c := &Checker{
manager: manager,
resourceGroupName: resourceGroupName,
originalSQL: originalSQL,
sqlDigest: sqlDigest,
planDigest: planDigest,
settings: settings,
markedByRule: atomic.Bool{},
markedByWatch: false,
manager: manager,
resourceGroupName: resourceGroupName,
originalSQL: originalSQL,
sqlDigest: sqlDigest,
planDigest: planDigest,
settings: settings,
markedByIdentifyInRunawaySettings: atomic.Bool{},
markedByQueryWatchRule: false,
}
if settings != nil {
// avoid setting deadline if the threshold is 0
Expand All @@ -92,6 +92,10 @@ func (rm *Manager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, plan
logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err))
return nil
}
// Only check the normal statement.
if len(planDigest) == 0 {
return nil
}
rm.ActiveLock.RLock()
defer rm.ActiveLock.RUnlock()
if group.RunawaySettings == nil && rm.ActiveGroup[resourceGroupName] == 0 {
Expand Down Expand Up @@ -129,7 +133,7 @@ func (r *Checker) BeforeExecutor() (string, error) {
switchGroupName = r.settings.SwitchGroupName
}
// Mark it if this is the first time being watched.
r.markRunawayByWatch(action, switchGroupName, exceedCause)
r.markRunawayByQueryWatchRule(action, switchGroupName, exceedCause)
// Take action if needed.
switch action {
case rmpb.RunawayAction_Kill:
Expand Down Expand Up @@ -157,16 +161,16 @@ func (r *Checker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// If the group settings are not available, and it's not marked by watch, skip this part.
if r.settings == nil && !r.markedByWatch {
if r.settings == nil && !r.markedByQueryWatchRule {
return nil
}
// If it's marked by watch and the action is cooldown, override the priority,
if r.markedByWatch && r.watchAction == rmpb.RunawayAction_CoolDown {
if r.markedByQueryWatchRule && r.watchAction == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
// If group settings are available and the query is not marked by a rule,
// verify if it matches any rules in the settings.
if r.settings != nil && !r.markedByRule.Load() {
if r.settings != nil && !r.markedByIdentifyInRunawaySettings.Load() {
now := time.Now()
// only check time and need to ensure deadline existed.
exceedCause := r.exceedsThresholds(now, nil, 0)
Expand All @@ -181,7 +185,7 @@ func (r *Checker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// execution time exceeds the threshold, mark the query as runaway
r.markRunawayByIdentify(&now, exceedCause)
r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause)
// Take action if needed.
switch r.settings.Action {
case rmpb.RunawayAction_Kill:
Expand All @@ -205,10 +209,10 @@ func (r *Checker) CheckAction() rmpb.RunawayAction {
if r == nil {
return rmpb.RunawayAction_NoneAction
}
if r.markedByWatch {
if r.markedByQueryWatchRule {
return r.watchAction
}
if r.markedByRule.Load() {
if r.markedByIdentifyInRunawaySettings.Load() {
return r.settings.Action
}
return rmpb.RunawayAction_NoneAction
Expand All @@ -217,17 +221,17 @@ func (r *Checker) CheckAction() rmpb.RunawayAction {
// CheckRuleKillAction checks whether the query should be killed according to the group settings.
func (r *Checker) CheckRuleKillAction() (string, bool) {
// If the group settings are not available, and it's not marked by watch, skip this part.
if r == nil || r.settings == nil && !r.markedByWatch {
if r == nil || r.settings == nil && !r.markedByQueryWatchRule {
return "", false
}
// If the group settings are available, and it's not marked by rule, check the execution time.
if r.settings != nil && !r.markedByRule.Load() {
if r.settings != nil && !r.markedByIdentifyInRunawaySettings.Load() {
now := time.Now()
exceedCause := r.exceedsThresholds(now, nil, 0)
if exceedCause == "" {
return "", false
}
r.markRunawayByIdentify(&now, exceedCause)
r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause)
return exceedCause, r.settings.Action == rmpb.RunawayAction_Kill
}
return "", false
Expand All @@ -243,19 +247,19 @@ func (r *Checker) markQuarantine(now *time.Time, exceedCause string) {
r.settings.Action, r.settings.SwitchGroupName, ttl, now, exceedCause)
}

func (r *Checker) markRunawayByIdentify(now *time.Time, exceedCause string) bool {
swapped := r.markedByRule.CompareAndSwap(false, true)
func (r *Checker) markRunawayByIdentifyInRunawaySettings(now *time.Time, exceedCause string) bool {
swapped := r.markedByIdentifyInRunawaySettings.CompareAndSwap(false, true)
if swapped {
r.markRunaway("identify", r.settings.Action, r.settings.SwitchGroupName, now, exceedCause)
if !r.markedByWatch {
if !r.markedByQueryWatchRule {
r.markQuarantine(now, exceedCause)
}
}
return swapped
}

func (r *Checker) markRunawayByWatch(action rmpb.RunawayAction, switchGroupName, exceedCause string) {
r.markedByWatch = true
func (r *Checker) markRunawayByQueryWatchRule(action rmpb.RunawayAction, switchGroupName, exceedCause string) {
r.markedByQueryWatchRule = true
r.watchAction = action
now := time.Now()
r.markRunaway("watch", action, switchGroupName, &now, exceedCause)
Expand Down Expand Up @@ -326,15 +330,15 @@ func (r *Checker) CheckThresholds(ruDetail *util.RUDetails, processKeys int64, e
// add the processed keys to the total processed keys.
r.totalProcessedKeys += processKeys
exceedCause := r.exceedsThresholds(checkTime, ruDetail, r.totalProcessedKeys)
if !r.markedByRule.Load() {
if exceedCause != "" && r.markRunawayByIdentify(&now, exceedCause) {
if r.markRunawayByIdentify(&now, exceedCause) {
if !r.markedByIdentifyInRunawaySettings.Load() {
if exceedCause != "" && r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
if r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
}
}
}
// Due to concurrency, check again.
if r.markedByRule.Load() {
if r.markedByIdentifyInRunawaySettings.Load() {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestBuildCopIteratorWithRunawayChecker(t *testing.T) {

sql := "select * from t"
group1 := "rg1"
checker := manager.DeriveChecker(group1, sql, "", "", time.Now())
checker := manager.DeriveChecker(group1, sql, "test", "test", time.Now())
manager.AddWatch(&runaway.QuarantineRecord{
ID: 1,
ResourceGroupName: group1,
Expand Down