From 782e180d0a1ceb8187875ed2c39994260195a558 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 23 Jan 2024 15:32:35 +0800 Subject: [PATCH] Optimize the RegisterFromNodeAnnotations code to make it clearer. Enhance its readability. Signed-off-by: chaunceyjiang --- cmd/scheduler/main.go | 2 +- pkg/scheduler/scheduler.go | 183 +++++++++++++++++++------------------ pkg/util/util.go | 12 +++ 3 files changed, 105 insertions(+), 92 deletions(-) diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index d773359cb..52f2c8f39 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -67,7 +67,7 @@ func start() { defer sher.Stop() // start monitor metrics - go sher.RegisterFromNodeAnnotatons() + go sher.RegisterFromNodeAnnotations() go initmetrics(config.MetricsBindAddress) // start http server diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6d6716442..e5d5d9a78 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -129,111 +129,112 @@ func (s *Scheduler) Stop() { close(s.stopCh) } -func (s *Scheduler) RegisterFromNodeAnnotatons() error { +func (s *Scheduler) RegisterFromNodeAnnotations() { klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations") nodeInfoCopy := make(map[string]*NodeInfo) for { nodes, err := s.nodeLister.List(labels.Everything()) if err != nil { klog.Errorln("nodes list failed", err.Error()) - return err } - nodeNames := []string{} - for _, val := range nodes { - nodeNames = append(nodeNames, val.Name) - for devhandsk, devreg := range device.KnownDevice { - _, ok := val.Annotations[devreg] - if !ok { - continue - } - nodedevices, err := util.DecodeNodeDevices(val.Annotations[devreg]) - if err != nil { - klog.ErrorS(err, "failed to decode node devices", "node", val.Name, "device annotation", val.Annotations[devreg]) - continue - } - if len(nodedevices) == 0 { - klog.InfoS("no node gpu device found", "node", val.Name, "device annotation", val.Annotations[devreg]) - continue - } - klog.V(5).InfoS("nodes device information", "node", val.Name, "nodedevices", util.EncodeNodeDevices(nodedevices)) - handshake := val.Annotations[devhandsk] - if strings.Contains(handshake, "Requesting") { - formertime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1]) - if time.Now().After(formertime.Add(time.Second * 60)) { - _, ok := s.nodes[val.Name] - if ok { - _, ok = nodeInfoCopy[devhandsk] - if ok && nodeInfoCopy[devhandsk] != nil { - s.rmNodeDevice(val.Name, nodeInfoCopy[devhandsk]) - klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], err, s.nodes[val.Name].Devices) + var nodeNames []string + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) + s.handleNodeAnnotations(node, nodeInfoCopy) + } + _, _, err = s.getNodesUsage(&nodeNames, nil) + if err != nil { + klog.Errorln("get node usage failed", err.Error()) + } + time.Sleep(time.Second * 15) + } +} - tmppat := make(map[string]string) - tmppat[devhandsk] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05") - n, err := util.GetNode(val.Name) - if err != nil { - klog.Errorln("get node failed", err.Error()) - continue - } - util.PatchNodeAnnotations(n, tmppat) - continue - } - } - } - continue - } else if strings.Contains(handshake, "Deleted") { - continue - } else { - tmppat := make(map[string]string) - tmppat[devhandsk] = "Requesting_" + time.Now().Format("2006.01.02 15:04:05") - n, err := util.GetNode(val.Name) - if err != nil { - klog.Errorln("get node failed", err.Error()) - continue - } - util.PatchNodeAnnotations(n, tmppat) +// handleNodeAnnotations is used to handle node annotations and add node to nodeManager +func (s *Scheduler) handleNodeAnnotations(node *v1.Node, nodeInfoCopy map[string]*NodeInfo) { + for devHandshake, devRegister := range device.KnownDevice { + // check if node has device annotation + if _, ok := node.Annotations[devRegister]; !ok { + continue + } + // decode node device annotation + nodeDevices, err := util.DecodeNodeDevices(node.Annotations[devRegister]) + if err != nil { + klog.ErrorS(err, "failed to decode node devices", "node", node.Name, "device annotation", node.Annotations[devRegister]) + continue + } + if len(nodeDevices) == 0 { + klog.InfoS("no node gpu device found", "node", node.Name, "device annotation", node.Annotations[devRegister]) + continue + } + klog.V(5).InfoS("nodes device information", "node", node.Name, "nodedevices", util.EncodeNodeDevices(nodeDevices)) + handshake := node.Annotations[devHandshake] + switch { + case strings.Contains(handshake, "Requesting"): + handshakeTime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1]) + // if node device is requesting, and handshake time is less than 60s, we skip it + if time.Now().Before(handshakeTime.Add(time.Second * 60)) { + klog.V(4).InfoS("node device is requesting", "node", node.Name, "device annotation", node.Annotations[devRegister]) + continue + } + klog.Infof("node %v device %s handshake timeout", node.Name, devHandshake) + if _, ok := s.nodes[node.Name]; !ok { + klog.Infof("node %v device %s handshake timeout, but node not register", node.Name, devHandshake) + continue + } + if nodeInfo, ok := nodeInfoCopy[devHandshake]; ok && nodeInfo != nil { + s.rmNodeDevice(node.Name, nodeInfoCopy[devHandshake]) + klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], err, s.nodes[node.Name].Devices) + if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Deleted"); err != nil { + klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister]) } - nodeInfo := &NodeInfo{} - nodeInfo.ID = val.Name - nodeInfo.Devices = make([]DeviceInfo, 0) - for index, deviceinfo := range nodedevices { - found := false - _, ok := s.nodes[val.Name] - if ok { - for i1, val1 := range s.nodes[val.Name].Devices { - if strings.Compare(val1.ID, deviceinfo.Id) == 0 { - found = true - s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem - s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore - break - } - } - } - if !found { - nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{ - ID: deviceinfo.Id, - Index: uint(index), - Count: deviceinfo.Count, - Devmem: deviceinfo.Devmem, - Devcore: deviceinfo.Devcore, - Type: deviceinfo.Type, - Numa: deviceinfo.Numa, - Health: deviceinfo.Health, - }) + } + continue + case strings.Contains(handshake, "Deleted"): + klog.V(4).Infof("node %v device %s:%v deleted", node.Name, devHandshake, nodeInfoCopy[devHandshake]) + continue + } + if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Requesting"); err != nil { + klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister]) + } + // if node device is not requesting, we add node to nodeManager + // and update node device info + nodeInfo := &NodeInfo{} + nodeInfo.ID = node.Name + nodeInfo.Devices = make([]DeviceInfo, 0) + for index, deviceInfo := range nodeDevices { + found := false + if _, ok := s.nodes[node.Name]; ok { + for i1, val1 := range s.nodes[node.Name].Devices { + if strings.Compare(val1.ID, deviceInfo.Id) != 0 { + continue } + // if device is already registered, we update device info + found = true + s.nodes[node.Name].Devices[i1].Devmem = deviceInfo.Devmem + s.nodes[node.Name].Devices[i1].Devcore = deviceInfo.Devcore + break } - s.addNode(val.Name, nodeInfo) - nodeInfoCopy[devhandsk] = nodeInfo - if s.nodes[val.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 { - klog.Infof("node %v device %s come node info=%v total=%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], s.nodes[val.Name].Devices) - } + } + if !found { + nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{ + ID: deviceInfo.Id, + Index: uint(index), + Count: deviceInfo.Count, + Devmem: deviceInfo.Devmem, + Devcore: deviceInfo.Devcore, + Type: deviceInfo.Type, + Numa: deviceInfo.Numa, + Health: deviceInfo.Health, + }) } } - _, _, err = s.getNodesUsage(&nodeNames, nil) - if err != nil { - klog.Errorln("get node usage failed", err.Error()) - return err + // add node to nodeManager + s.addNode(node.Name, nodeInfo) + nodeInfoCopy[devHandshake] = nodeInfo + if s.nodes[node.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 { + klog.Infof("node %v device %s come node info=%v total=%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], s.nodes[node.Name].Devices) } - time.Sleep(time.Second * 15) } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 1d7b80659..c3c68d891 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -24,6 +24,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/Project-HAMi/HAMi/pkg/api" "github.com/Project-HAMi/HAMi/pkg/util/client" @@ -294,6 +295,17 @@ func PatchNodeAnnotations(node *v1.Node, annotations map[string]string) error { return err } +func PatchHandshakeToNodeAnnotation(nodeName, handshake, handshakeValue string) error { + tmpAnno := make(map[string]string) + tmpAnno[handshake] = fmt.Sprintf("%s_%s", handshakeValue, time.Now().String()) + n, err := GetNode(nodeName) + if err != nil { + klog.Errorln("get node failed", err.Error()) + return err + } + return PatchNodeAnnotations(n, tmpAnno) +} + func PatchPodAnnotations(pod *v1.Pod, annotations map[string]string) error { type patchMetadata struct { Annotations map[string]string `json:"annotations,omitempty"`