Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the RegisterFromNodeAnnotations code to make it clearer. Enhance its readability. #133

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func start() {
defer sher.Stop()

// start monitor metrics
go sher.RegisterFromNodeAnnotatons()
go sher.RegisterFromNodeAnnotations()
go initmetrics(config.MetricsBindAddress)

// start http server
Expand Down
183 changes: 92 additions & 91 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,111 +129,112 @@ func (s *Scheduler) Stop() {
close(s.stopCh)
}

func (s *Scheduler) RegisterFromNodeAnnotatons() error {
func (s *Scheduler) RegisterFromNodeAnnotations() {
klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
nodeInfoCopy := make(map[string]*NodeInfo)
for {
nodes, err := s.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorln("nodes list failed", err.Error())
return err
}
nodeNames := []string{}
for _, val := range nodes {
nodeNames = append(nodeNames, val.Name)
for devhandsk, devreg := range device.KnownDevice {
_, ok := val.Annotations[devreg]
if !ok {
continue
}
nodedevices, err := util.DecodeNodeDevices(val.Annotations[devreg])
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", val.Name, "device annotation", val.Annotations[devreg])
continue
}
if len(nodedevices) == 0 {
klog.InfoS("no node gpu device found", "node", val.Name, "device annotation", val.Annotations[devreg])
continue
}
klog.V(5).InfoS("nodes device information", "node", val.Name, "nodedevices", util.EncodeNodeDevices(nodedevices))
handshake := val.Annotations[devhandsk]
if strings.Contains(handshake, "Requesting") {
formertime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1])
if time.Now().After(formertime.Add(time.Second * 60)) {
_, ok := s.nodes[val.Name]
if ok {
_, ok = nodeInfoCopy[devhandsk]
if ok && nodeInfoCopy[devhandsk] != nil {
s.rmNodeDevice(val.Name, nodeInfoCopy[devhandsk])
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], err, s.nodes[val.Name].Devices)
var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
s.handleNodeAnnotations(node, nodeInfoCopy)
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
}
time.Sleep(time.Second * 15)
}
}

tmppat := make(map[string]string)
tmppat[devhandsk] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
continue
}
}
}
continue
} else if strings.Contains(handshake, "Deleted") {
continue
} else {
tmppat := make(map[string]string)
tmppat[devhandsk] = "Requesting_" + time.Now().Format("2006.01.02 15:04:05")
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
// handleNodeAnnotations is used to handle node annotations and add node to nodeManager
func (s *Scheduler) handleNodeAnnotations(node *v1.Node, nodeInfoCopy map[string]*NodeInfo) {
for devHandshake, devRegister := range device.KnownDevice {
// check if node has device annotation
if _, ok := node.Annotations[devRegister]; !ok {
continue
}
// decode node device annotation
nodeDevices, err := util.DecodeNodeDevices(node.Annotations[devRegister])
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
if len(nodeDevices) == 0 {
klog.InfoS("no node gpu device found", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
klog.V(5).InfoS("nodes device information", "node", node.Name, "nodedevices", util.EncodeNodeDevices(nodeDevices))
handshake := node.Annotations[devHandshake]
switch {
case strings.Contains(handshake, "Requesting"):
handshakeTime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1])
// if node device is requesting, and handshake time is less than 60s, we skip it
if time.Now().Before(handshakeTime.Add(time.Second * 60)) {
klog.V(4).InfoS("node device is requesting", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
klog.Infof("node %v device %s handshake timeout", node.Name, devHandshake)
if _, ok := s.nodes[node.Name]; !ok {
klog.Infof("node %v device %s handshake timeout, but node not register", node.Name, devHandshake)
continue
}
if nodeInfo, ok := nodeInfoCopy[devHandshake]; ok && nodeInfo != nil {
s.rmNodeDevice(node.Name, nodeInfoCopy[devHandshake])
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], err, s.nodes[node.Name].Devices)
if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Deleted"); err != nil {
klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister])
}
nodeInfo := &NodeInfo{}
nodeInfo.ID = val.Name
nodeInfo.Devices = make([]DeviceInfo, 0)
for index, deviceinfo := range nodedevices {
found := false
_, ok := s.nodes[val.Name]
if ok {
for i1, val1 := range s.nodes[val.Name].Devices {
if strings.Compare(val1.ID, deviceinfo.Id) == 0 {
found = true
s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem
s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore
break
}
}
}
if !found {
nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{
ID: deviceinfo.Id,
Index: uint(index),
Count: deviceinfo.Count,
Devmem: deviceinfo.Devmem,
Devcore: deviceinfo.Devcore,
Type: deviceinfo.Type,
Numa: deviceinfo.Numa,
Health: deviceinfo.Health,
})
}
continue
case strings.Contains(handshake, "Deleted"):
klog.V(4).Infof("node %v device %s:%v deleted", node.Name, devHandshake, nodeInfoCopy[devHandshake])
continue
}
if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Requesting"); err != nil {
klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister])
}
// if node device is not requesting, we add node to nodeManager
// and update node device info
nodeInfo := &NodeInfo{}
nodeInfo.ID = node.Name
nodeInfo.Devices = make([]DeviceInfo, 0)
for index, deviceInfo := range nodeDevices {
found := false
if _, ok := s.nodes[node.Name]; ok {
for i1, val1 := range s.nodes[node.Name].Devices {
if strings.Compare(val1.ID, deviceInfo.Id) != 0 {
continue
}
// if device is already registered, we update device info
found = true
s.nodes[node.Name].Devices[i1].Devmem = deviceInfo.Devmem
s.nodes[node.Name].Devices[i1].Devcore = deviceInfo.Devcore
break
}
s.addNode(val.Name, nodeInfo)
nodeInfoCopy[devhandsk] = nodeInfo
if s.nodes[val.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%v total=%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], s.nodes[val.Name].Devices)
}
}
if !found {
nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{
ID: deviceInfo.Id,
Index: uint(index),
Count: deviceInfo.Count,
Devmem: deviceInfo.Devmem,
Devcore: deviceInfo.Devcore,
Type: deviceInfo.Type,
Numa: deviceInfo.Numa,
Health: deviceInfo.Health,
})
}
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
return err
// add node to nodeManager
s.addNode(node.Name, nodeInfo)
nodeInfoCopy[devHandshake] = nodeInfo
if s.nodes[node.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%v total=%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], s.nodes[node.Name].Devices)
}
time.Sleep(time.Second * 15)
}
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/Project-HAMi/HAMi/pkg/api"
"github.com/Project-HAMi/HAMi/pkg/util/client"
Expand Down Expand Up @@ -294,6 +295,17 @@ func PatchNodeAnnotations(node *v1.Node, annotations map[string]string) error {
return err
}

func PatchHandshakeToNodeAnnotation(nodeName, handshake, handshakeValue string) error {
tmpAnno := make(map[string]string)
tmpAnno[handshake] = fmt.Sprintf("%s_%s", handshakeValue, time.Now().String())
n, err := GetNode(nodeName)
if err != nil {
klog.Errorln("get node failed", err.Error())
return err
}
return PatchNodeAnnotations(n, tmpAnno)
}

func PatchPodAnnotations(pod *v1.Pod, annotations map[string]string) error {
type patchMetadata struct {
Annotations map[string]string `json:"annotations,omitempty"`
Expand Down
Loading