Skip to content

Commit

Permalink
Optimize the RegisterFromNodeAnnotations code to make it clearer. Enh…
Browse files Browse the repository at this point in the history
…ance its readability.

Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
  • Loading branch information
chaunceyjiang committed Feb 28, 2024
1 parent aae4e8c commit 782e180
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 92 deletions.
2 changes: 1 addition & 1 deletion cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func start() {
defer sher.Stop()

// start monitor metrics
go sher.RegisterFromNodeAnnotatons()
go sher.RegisterFromNodeAnnotations()
go initmetrics(config.MetricsBindAddress)

// start http server
Expand Down
183 changes: 92 additions & 91 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,111 +129,112 @@ func (s *Scheduler) Stop() {
close(s.stopCh)
}

func (s *Scheduler) RegisterFromNodeAnnotatons() error {
func (s *Scheduler) RegisterFromNodeAnnotations() {
klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
nodeInfoCopy := make(map[string]*NodeInfo)
for {
nodes, err := s.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorln("nodes list failed", err.Error())
return err
}
nodeNames := []string{}
for _, val := range nodes {
nodeNames = append(nodeNames, val.Name)
for devhandsk, devreg := range device.KnownDevice {
_, ok := val.Annotations[devreg]
if !ok {
continue
}
nodedevices, err := util.DecodeNodeDevices(val.Annotations[devreg])
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", val.Name, "device annotation", val.Annotations[devreg])
continue
}
if len(nodedevices) == 0 {
klog.InfoS("no node gpu device found", "node", val.Name, "device annotation", val.Annotations[devreg])
continue
}
klog.V(5).InfoS("nodes device information", "node", val.Name, "nodedevices", util.EncodeNodeDevices(nodedevices))
handshake := val.Annotations[devhandsk]
if strings.Contains(handshake, "Requesting") {
formertime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1])
if time.Now().After(formertime.Add(time.Second * 60)) {
_, ok := s.nodes[val.Name]
if ok {
_, ok = nodeInfoCopy[devhandsk]
if ok && nodeInfoCopy[devhandsk] != nil {
s.rmNodeDevice(val.Name, nodeInfoCopy[devhandsk])
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], err, s.nodes[val.Name].Devices)
var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
s.handleNodeAnnotations(node, nodeInfoCopy)
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
}
time.Sleep(time.Second * 15)
}
}

tmppat := make(map[string]string)
tmppat[devhandsk] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
continue
}
}
}
continue
} else if strings.Contains(handshake, "Deleted") {
continue
} else {
tmppat := make(map[string]string)
tmppat[devhandsk] = "Requesting_" + time.Now().Format("2006.01.02 15:04:05")
n, err := util.GetNode(val.Name)
if err != nil {
klog.Errorln("get node failed", err.Error())
continue
}
util.PatchNodeAnnotations(n, tmppat)
// handleNodeAnnotations is used to handle node annotations and add node to nodeManager
func (s *Scheduler) handleNodeAnnotations(node *v1.Node, nodeInfoCopy map[string]*NodeInfo) {
for devHandshake, devRegister := range device.KnownDevice {
// check if node has device annotation
if _, ok := node.Annotations[devRegister]; !ok {
continue
}
// decode node device annotation
nodeDevices, err := util.DecodeNodeDevices(node.Annotations[devRegister])
if err != nil {
klog.ErrorS(err, "failed to decode node devices", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
if len(nodeDevices) == 0 {
klog.InfoS("no node gpu device found", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
klog.V(5).InfoS("nodes device information", "node", node.Name, "nodedevices", util.EncodeNodeDevices(nodeDevices))
handshake := node.Annotations[devHandshake]
switch {
case strings.Contains(handshake, "Requesting"):
handshakeTime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1])
// if node device is requesting, and handshake time is less than 60s, we skip it
if time.Now().Before(handshakeTime.Add(time.Second * 60)) {
klog.V(4).InfoS("node device is requesting", "node", node.Name, "device annotation", node.Annotations[devRegister])
continue
}
klog.Infof("node %v device %s handshake timeout", node.Name, devHandshake)
if _, ok := s.nodes[node.Name]; !ok {
klog.Infof("node %v device %s handshake timeout, but node not register", node.Name, devHandshake)
continue
}
if nodeInfo, ok := nodeInfoCopy[devHandshake]; ok && nodeInfo != nil {
s.rmNodeDevice(node.Name, nodeInfoCopy[devHandshake])
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], err, s.nodes[node.Name].Devices)
if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Deleted"); err != nil {
klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister])
}
nodeInfo := &NodeInfo{}
nodeInfo.ID = val.Name
nodeInfo.Devices = make([]DeviceInfo, 0)
for index, deviceinfo := range nodedevices {
found := false
_, ok := s.nodes[val.Name]
if ok {
for i1, val1 := range s.nodes[val.Name].Devices {
if strings.Compare(val1.ID, deviceinfo.Id) == 0 {
found = true
s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem
s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore
break
}
}
}
if !found {
nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{
ID: deviceinfo.Id,
Index: uint(index),
Count: deviceinfo.Count,
Devmem: deviceinfo.Devmem,
Devcore: deviceinfo.Devcore,
Type: deviceinfo.Type,
Numa: deviceinfo.Numa,
Health: deviceinfo.Health,
})
}
continue
case strings.Contains(handshake, "Deleted"):
klog.V(4).Infof("node %v device %s:%v deleted", node.Name, devHandshake, nodeInfoCopy[devHandshake])
continue
}
if err = util.PatchHandshakeToNodeAnnotation(node.Name, devHandshake, "Requesting"); err != nil {
klog.ErrorS(err, "patch node annotation failed", "node", node.Name, "device annotation", node.Annotations[devRegister])
}
// if node device is not requesting, we add node to nodeManager
// and update node device info
nodeInfo := &NodeInfo{}
nodeInfo.ID = node.Name
nodeInfo.Devices = make([]DeviceInfo, 0)
for index, deviceInfo := range nodeDevices {
found := false
if _, ok := s.nodes[node.Name]; ok {
for i1, val1 := range s.nodes[node.Name].Devices {
if strings.Compare(val1.ID, deviceInfo.Id) != 0 {
continue
}
// if device is already registered, we update device info
found = true
s.nodes[node.Name].Devices[i1].Devmem = deviceInfo.Devmem
s.nodes[node.Name].Devices[i1].Devcore = deviceInfo.Devcore
break
}
s.addNode(val.Name, nodeInfo)
nodeInfoCopy[devhandsk] = nodeInfo
if s.nodes[val.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%v total=%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], s.nodes[val.Name].Devices)
}
}
if !found {
nodeInfo.Devices = append(nodeInfo.Devices, DeviceInfo{
ID: deviceInfo.Id,
Index: uint(index),
Count: deviceInfo.Count,
Devmem: deviceInfo.Devmem,
Devcore: deviceInfo.Devcore,
Type: deviceInfo.Type,
Numa: deviceInfo.Numa,
Health: deviceInfo.Health,
})
}
}
_, _, err = s.getNodesUsage(&nodeNames, nil)
if err != nil {
klog.Errorln("get node usage failed", err.Error())
return err
// add node to nodeManager
s.addNode(node.Name, nodeInfo)
nodeInfoCopy[devHandshake] = nodeInfo
if s.nodes[node.Name] != nil && nodeInfo != nil && len(nodeInfo.Devices) > 0 {
klog.Infof("node %v device %s come node info=%v total=%v", node.Name, devHandshake, nodeInfoCopy[devHandshake], s.nodes[node.Name].Devices)
}
time.Sleep(time.Second * 15)
}
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/Project-HAMi/HAMi/pkg/api"
"github.com/Project-HAMi/HAMi/pkg/util/client"
Expand Down Expand Up @@ -294,6 +295,17 @@ func PatchNodeAnnotations(node *v1.Node, annotations map[string]string) error {
return err
}

func PatchHandshakeToNodeAnnotation(nodeName, handshake, handshakeValue string) error {
tmpAnno := make(map[string]string)
tmpAnno[handshake] = fmt.Sprintf("%s_%s", handshakeValue, time.Now().String())
n, err := GetNode(nodeName)
if err != nil {
klog.Errorln("get node failed", err.Error())
return err
}
return PatchNodeAnnotations(n, tmpAnno)
}

func PatchPodAnnotations(pod *v1.Pod, annotations map[string]string) error {
type patchMetadata struct {
Annotations map[string]string `json:"annotations,omitempty"`
Expand Down

0 comments on commit 782e180

Please sign in to comment.