Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent deployments bug #502

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"os/user"
"path/filepath"
"strings"
"time"

"github.com/AliceO2Group/Control/apricot"
apricotpb "github.com/AliceO2Group/Control/apricot/protos"
Expand Down Expand Up @@ -120,6 +121,7 @@ func setDefaults() error {
viper.SetDefault("concurrentIteratorRoleExpansion", true)
viper.SetDefault("reuseUnlockedTasks", false)
viper.SetDefault("configCache", true)
viper.SetDefault("taskClassCacheTTL", 7*24*time.Hour)
return nil
}

Expand Down Expand Up @@ -183,6 +185,7 @@ func setFlags() error {
pflag.Bool("concurrentIteratorRoleExpansion", viper.GetBool("concurrentIteratorRoleExpansion"), "Expand iterator roles concurrently during workflow template processing")
pflag.Bool("reuseUnlockedTasks", viper.GetBool("reuseUnlockedTasks"), "Reuse unlocked active tasks when satisfying environment deployment requests")
pflag.Bool("configCache", viper.GetBool("configCache"), "Enable cache layer between AliECS core and Apricot")
pflag.Duration("taskClassCacheTTL", viper.GetDuration("taskClassCacheTTL"), "TTL for task class cache entries")

pflag.Parse()
return viper.BindPFlags(pflag.CommandLine)
Expand Down
24 changes: 23 additions & 1 deletion core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,18 @@ func (m *Manager) removeInactiveClasses() {
_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
keys := make([]string, 0)

taskClassCacheTTL := viper.GetDuration("taskClassCacheTTL")

// push keys of classes that don't appear in roster any more into a slice
for taskClassIdentifier := range *classMap {
for taskClassIdentifier, class := range *classMap {
if class == nil {
// don't really know what to do with a valid TCI but nil class
continue
}
if time.Since(class.UpdatedTimestamp) < taskClassCacheTTL {
// class is still fresh, skip
continue
}
if len(m.roster.filteredForClass(taskClassIdentifier)) == 0 {
keys = append(keys, taskClassIdentifier)
}
Expand Down Expand Up @@ -286,9 +296,21 @@ func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
}

func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
log.WithField("taskClassesRequired", len(taskClassesRequired)).
Debug("waiting to refresh task classes")
defer utils.TimeTrackFunction(time.Now(), log.WithField("taskClassesRequired", len(taskClassesRequired)))

m.deployMu.Lock()
defer m.deployMu.Unlock()

log.WithField("taskClassesRequired", len(taskClassesRequired)).
Debug("cleaning up inactive task classes")

m.removeInactiveClasses()

log.WithField("taskClassesRequired", len(taskClassesRequired)).
Debug("loading required task classes")

var taskClassList []*taskclass.Class
taskClassList, err = getTaskClassList(taskClassesRequired)
if err != nil {
Expand Down
224 changes: 37 additions & 187 deletions core/task/taskclass/class.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2018 CERN and copyright holders of ALICE O².
* Copyright 2018-2023 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
Expand All @@ -26,21 +26,34 @@ package taskclass

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/AliceO2Group/Control/common"
"github.com/AliceO2Group/Control/common/controlmode"
"github.com/AliceO2Group/Control/common/gera"
"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/core/task/channel"
"github.com/AliceO2Group/Control/core/task/constraint"
"github.com/AliceO2Group/Control/core/task/taskclass/port"
"github.com/sirupsen/logrus"
)

var log = logger.New(logrus.StandardLogger(), "taskclass")

type Id struct {
RepoIdentifier string
Hash string
Name string
}

func (tcID Id) String() string {
return fmt.Sprintf("%s/tasks/%s@%s", tcID.RepoIdentifier, tcID.Name, tcID.Hash)
}

func (tcID *Id) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
err = unmarshal(&tcID.Name)
return
}

// ↓ We need the roles tree to know *where* to run it and how to *configure* it, but
//
// the following information is enough to run the task even with no environment or
Expand All @@ -52,13 +65,14 @@ type Class struct {
Control struct {
Mode controlmode.ControlMode `yaml:"mode"`
} `yaml:"control"`
Command *common.CommandInfo `yaml:"command"`
Wants ResourceWants `yaml:"wants"`
Limits *ResourceLimits `yaml:"limits"`
Bind []channel.Inbound `yaml:"bind"`
Properties gera.StringMap `yaml:"properties"`
Constraints []constraint.Constraint `yaml:"constraints"`
Connect []channel.Outbound `yaml:"connect"`
Command *common.CommandInfo `yaml:"command"`
Wants ResourceWants `yaml:"wants"`
Limits *ResourceLimits `yaml:"limits"`
Bind []channel.Inbound `yaml:"bind"`
Properties gera.StringMap `yaml:"properties"`
Constraints []constraint.Constraint `yaml:"constraints"`
Connect []channel.Outbound `yaml:"connect"`
UpdatedTimestamp time.Time `yaml:"-"`
}

func (c *Class) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
Expand Down Expand Up @@ -94,17 +108,18 @@ func (c *Class) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
}
}
*c = Class{
Identifier: aux.Identifier,
Defaults: gera.MakeStringMapWithMap(aux.Defaults),
Vars: gera.MakeStringMapWithMap(aux.Vars),
Control: aux.Control,
Command: aux.Command,
Wants: aux.Wants,
Limits: aux.Limits,
Bind: aux.Bind,
Properties: gera.MakeStringMapWithMap(aux.Properties),
Constraints: aux.Constraints,
Connect: aux.Connect,
Identifier: aux.Identifier,
Defaults: gera.MakeStringMapWithMap(aux.Defaults),
Vars: gera.MakeStringMapWithMap(aux.Vars),
Control: aux.Control,
Command: aux.Command,
Wants: aux.Wants,
Limits: aux.Limits,
Bind: aux.Bind,
Properties: gera.MakeStringMapWithMap(aux.Properties),
Constraints: aux.Constraints,
Connect: aux.Connect,
UpdatedTimestamp: time.Now(),
}
}
return
Expand Down Expand Up @@ -150,101 +165,6 @@ func (c *Class) MarshalYAML() (interface{}, error) {
return aux, nil
}

type Id struct {
RepoIdentifier string
Hash string
Name string
}

func (tcID Id) String() string {
return fmt.Sprintf("%s/tasks/%s@%s", tcID.RepoIdentifier, tcID.Name, tcID.Hash)
}

func (tcID *Id) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
err = unmarshal(&tcID.Name)
return
}

type ResourceWants struct {
Cpu *float64 `yaml:"cpu"`
Memory *float64 `yaml:"memory"`
Ports port.Ranges `yaml:"ports,omitempty"`
}

func (rw *ResourceWants) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
type _resourceWants struct {
Cpu *string `yaml:"cpu"`
Memory *string `yaml:"memory"`
Ports *string `yaml:"ports"`
}
aux := _resourceWants{}
err = unmarshal(&aux)
if err != nil {
return
}

if aux.Cpu != nil {
var cpuCount float64
cpuCount, err = strconv.ParseFloat(*aux.Cpu, 64)
if err != nil {
return
}
rw.Cpu = &cpuCount
}
if aux.Memory != nil {
var memCount float64
memCount, err = strconv.ParseFloat(*aux.Memory, 64)
if err != nil {
return
}
rw.Memory = &memCount
}
if aux.Ports != nil {
var ranges port.Ranges
ranges, err = port.RangesFromExpression(*aux.Ports)
if err != nil {
return
}
rw.Ports = ranges
}
return
}

type ResourceLimits struct {
Cpu *float64 `yaml:"cpu"`
Memory *float64 `yaml:"memory"`
}

func (rw *ResourceLimits) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
type _resourceLimits struct {
Cpu *string `yaml:"cpu"`
Memory *string `yaml:"memory"`
}
aux := _resourceLimits{}
err = unmarshal(&aux)
if err != nil {
return
}

if aux.Cpu != nil {
var cpuCount float64
cpuCount, err = strconv.ParseFloat(*aux.Cpu, 64)
if err != nil {
return
}
rw.Cpu = &cpuCount
}
if aux.Memory != nil {
var memCount float64
memCount, err = strconv.ParseFloat(*aux.Memory, 64)
if err != nil {
return
}
rw.Memory = &memCount
}
return
}

func (c *Class) Equals(other *Class) (response bool) {
if c == nil || other == nil {
return false
Expand All @@ -255,73 +175,3 @@ func (c *Class) Equals(other *Class) (response bool) {
c.Wants.Ports.Equals(other.Wants.Ports)
return
}

type Classes struct {
mu sync.RWMutex
classMap map[string]*Class
}

func (c *Classes) Do(f func(classMap *map[string]*Class) error) error {
c.mu.Lock()
defer c.mu.Unlock()
return f(&c.classMap)
}

func (c *Classes) Foreach(do func(string, *Class) bool) {
c.mu.Lock()
defer c.mu.Unlock()

for taskClassIdentifier, classPtr := range c.classMap {
ok := do(taskClassIdentifier, classPtr)
if !ok {
return
}
}
}

func (c *Classes) getMap() map[string]*Class {
c.mu.RLock()
defer c.mu.RUnlock()

return c.classMap
}

func (c *Classes) DeleteKey(key string) {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.classMap, key)
}

func (c *Classes) DeleteKeys(keys []string) {
c.mu.Lock()
defer c.mu.Unlock()

for _, k := range keys {
delete(c.classMap, k)
}
}

func (c *Classes) UpdateClass(key string, class *Class) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.classMap[key]; ok { //contains
*c.classMap[key] = *class // update
} else {
c.classMap[key] = class // else add class as new entry
}
}

func (c *Classes) GetClass(key string) (class *Class, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()

class, ok = c.classMap[key]
return
}

func NewClasses() *Classes {
return &Classes{
classMap: make(map[string]*Class),
}
}
Loading
Loading