From fc0ef70ead99fd6bc9016e593e49a55ae78c560f Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 27 May 2024 15:43:28 +0800 Subject: [PATCH] Fix the issue where scheduling can still occur on the node when the device-plugin crashes. Signed-off-by: chaunceyjiang --- pkg/device/ascend/device.go | 4 +- pkg/device/cambricon/device.go | 4 +- pkg/device/devices.go | 2 +- pkg/device/hygon/device.go | 2 +- pkg/device/iluvatar/device.go | 4 +- pkg/device/nvidia/device.go | 2 +- pkg/scheduler/nodes.go | 5 ++- pkg/scheduler/scheduler.go | 43 +++++++++----------- pkg/scheduler/scheduler_test.go | 70 +++++++++++++++++---------------- pkg/util/types.go | 17 ++++---- pkg/util/util.go | 11 +++--- 11 files changed, 83 insertions(+), 81 deletions(-) diff --git a/pkg/device/ascend/device.go b/pkg/device/ascend/device.go index 9671b2a5c..7759d8ad2 100644 --- a/pkg/device/ascend/device.go +++ b/pkg/device/ascend/device.go @@ -177,8 +177,8 @@ func (dev *AscendDevices) CheckUUID(annos map[string]string, d util.DeviceUsage) return true } -func (dev *AscendDevices) CheckHealth(devType string, n *corev1.Node) (bool, bool) { - return true, true +func (dev *AscendDevices) CheckHealth(devType string, n *corev1.Node) bool { + return true } func trimMemory(i int64) int64 { diff --git a/pkg/device/cambricon/device.go b/pkg/device/cambricon/device.go index 14e9e157f..036fb6274 100644 --- a/pkg/device/cambricon/device.go +++ b/pkg/device/cambricon/device.go @@ -167,8 +167,8 @@ func (dev *CambriconDevices) NodeCleanUp(nn string) error { return nil } -func (dev *CambriconDevices) CheckHealth(devType string, n *corev1.Node) (bool, bool) { - return true, true +func (dev *CambriconDevices) CheckHealth(devType string, n *corev1.Node) bool { + return true } func (dev *CambriconDevices) GetNodeDevices(n corev1.Node) ([]*api.DeviceInfo, error) { diff --git a/pkg/device/devices.go b/pkg/device/devices.go index 49b514342..fbfffd4f9 100644 --- a/pkg/device/devices.go +++ b/pkg/device/devices.go @@ -39,7 +39,7 @@ import ( type Devices interface { MutateAdmission(ctr *corev1.Container) (bool, error) - CheckHealth(devType string, n *corev1.Node) (bool, bool) + CheckHealth(devType string, n *corev1.Node) bool NodeCleanUp(nn string) error GetNodeDevices(n corev1.Node) ([]*api.DeviceInfo, error) CheckType(annos map[string]string, d util.DeviceUsage, n util.ContainerDeviceRequest) (bool, bool, bool) diff --git a/pkg/device/hygon/device.go b/pkg/device/hygon/device.go index 325cf01e6..b9c092069 100644 --- a/pkg/device/hygon/device.go +++ b/pkg/device/hygon/device.go @@ -131,7 +131,7 @@ func (dev *DCUDevices) NodeCleanUp(nn string) error { return util.MarkAnnotationsToDelete(HygonDCUDevice, nn) } -func (dev *DCUDevices) CheckHealth(devType string, n *corev1.Node) (bool, bool) { +func (dev *DCUDevices) CheckHealth(devType string, n *corev1.Node) bool { return util.CheckHealth(devType, n) } diff --git a/pkg/device/iluvatar/device.go b/pkg/device/iluvatar/device.go index 194ccfda9..9ffd3c399 100644 --- a/pkg/device/iluvatar/device.go +++ b/pkg/device/iluvatar/device.go @@ -158,8 +158,8 @@ func (dev *IluvatarDevices) CheckUUID(annos map[string]string, d util.DeviceUsag return true } -func (dev *IluvatarDevices) CheckHealth(devType string, n *corev1.Node) (bool, bool) { - return true, true +func (dev *IluvatarDevices) CheckHealth(devType string, n *corev1.Node) bool { + return true } func (dev *IluvatarDevices) GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest { diff --git a/pkg/device/nvidia/device.go b/pkg/device/nvidia/device.go index 8cbeaa50d..9b31a740d 100644 --- a/pkg/device/nvidia/device.go +++ b/pkg/device/nvidia/device.go @@ -81,7 +81,7 @@ func (dev *NvidiaGPUDevices) NodeCleanUp(nn string) error { return util.MarkAnnotationsToDelete(NvidiaGPUDevice, nn) } -func (dev *NvidiaGPUDevices) CheckHealth(devType string, n *corev1.Node) (bool, bool) { +func (dev *NvidiaGPUDevices) CheckHealth(devType string, n *corev1.Node) bool { return util.CheckHealth(devType, n) } diff --git a/pkg/scheduler/nodes.go b/pkg/scheduler/nodes.go index 41c94f4c5..b70fbd8cd 100644 --- a/pkg/scheduler/nodes.go +++ b/pkg/scheduler/nodes.go @@ -57,7 +57,7 @@ func (m *nodeManager) addNode(nodeID string, nodeInfo *util.NodeInfo) { } } -func (m *nodeManager) rmNodeDevice(nodeID string, nodeInfo *util.NodeInfo) { +func (m *nodeManager) rmNodeDevice(nodeID string, nodeInfo *util.NodeInfo, deviceVendor string) { m.mutex.Lock() defer m.mutex.Unlock() _, ok := m.nodes[nodeID] @@ -69,6 +69,9 @@ func (m *nodeManager) rmNodeDevice(nodeID string, nodeInfo *util.NodeInfo) { klog.V(5).Infoln("before rm:", m.nodes[nodeID].Devices, "needs remove", nodeInfo.Devices) tmp := make([]util.DeviceInfo, 0, len(m.nodes[nodeID].Devices)-len(nodeInfo.Devices)) for _, val := range m.nodes[nodeID].Devices { + if strings.Compare(val.DeviceVendor, deviceVendor) != 0 { + continue + } found := false for _, rmval := range nodeInfo.Devices { if strings.Compare(val.ID, rmval.ID) == 0 { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ce3d2aa6f..1e8b330b2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -150,7 +150,6 @@ func (s *Scheduler) Stop() { func (s *Scheduler) RegisterFromNodeAnnotations() { klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations") - nodeInfoCopy := make(map[string]*util.NodeInfo) ticker := time.NewTicker(time.Second * 15) for { select { @@ -168,26 +167,20 @@ func (s *Scheduler) RegisterFromNodeAnnotations() { for _, val := range nodes { nodeNames = append(nodeNames, val.Name) for devhandsk, devInstance := range device.GetDevices() { - health, needUpdate := devInstance.CheckHealth(devhandsk, val) + health := devInstance.CheckHealth(devhandsk, val) if !health { - _, ok := s.nodes[val.Name] + info, ok := s.nodes[val.Name] if ok { - _, ok = nodeInfoCopy[devhandsk] - if ok && nodeInfoCopy[devhandsk] != nil { - s.rmNodeDevice(val.Name, nodeInfoCopy[devhandsk]) - klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], err, s.nodes[val.Name].Devices) - - err := devInstance.NodeCleanUp(val.Name) - if err != nil { - klog.ErrorS(err, "markAnnotationsToDeleteFailed") - } - continue + err := devInstance.NodeCleanUp(val.Name) + if err != nil { + klog.ErrorS(err, "markAnnotationsToDeleteFailed") } + s.rmNodeDevice(val.Name, info, devhandsk) + klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, info, err, s.nodes[val.Name].Devices) } - } - if !needUpdate { continue } + _, ok := util.HandshakeAnnos[devhandsk] if ok { tmppat := make(map[string]string) @@ -223,21 +216,21 @@ func (s *Scheduler) RegisterFromNodeAnnotations() { } if !found { nodeInfo.Devices = append(nodeInfo.Devices, util.DeviceInfo{ - ID: deviceinfo.Id, - Index: uint(deviceinfo.Index), - Count: deviceinfo.Count, - Devmem: deviceinfo.Devmem, - Devcore: deviceinfo.Devcore, - Type: deviceinfo.Type, - Numa: deviceinfo.Numa, - Health: deviceinfo.Health, + ID: deviceinfo.Id, + Index: uint(deviceinfo.Index), + Count: deviceinfo.Count, + Devmem: deviceinfo.Devmem, + Devcore: deviceinfo.Devcore, + Type: deviceinfo.Type, + Numa: deviceinfo.Numa, + Health: deviceinfo.Health, + DeviceVendor: devhandsk, }) } } s.addNode(val.Name, nodeInfo) - nodeInfoCopy[devhandsk] = nodeInfo if s.nodes[val.Name] != nil && len(nodeInfo.Devices) > 0 { - klog.Infof("node %v device %s come node info=%v total=%v", val.Name, devhandsk, nodeInfoCopy[devhandsk], s.nodes[val.Name].Devices) + klog.Infof("node %v device %s come node info=%v total=%v", val.Name, devhandsk, nodeInfo, s.nodes[val.Name].Devices) } } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d3644ff8b..2b9114188 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -221,7 +221,7 @@ func Test_Filter(t *testing.T) { nodes, _ := s.ListNodes() for index := range nodes { node := nodes[index] - s.rmNodeDevice(node.ID, node) + s.rmNodeDevice(node.ID, node, nvidia.NvidiaGPUDevice) } pods, _ := s.ListPods() for index := range pods { @@ -232,24 +232,26 @@ func Test_Filter(t *testing.T) { ID: "node1", Devices: []util.DeviceInfo{ { - ID: "device1", - Index: 0, - Count: 10, - Devmem: 8000, - Devcore: 100, - Numa: 0, - Type: nvidia.NvidiaGPUDevice, - Health: true, + ID: "device1", + Index: 0, + Count: 10, + Devmem: 8000, + Devcore: 100, + Numa: 0, + Type: nvidia.NvidiaGPUDevice, + Health: true, + DeviceVendor: nvidia.NvidiaGPUDevice, }, { - ID: "device2", - Index: 1, - Count: 10, - Devmem: 8000, - Devcore: 100, - Numa: 0, - Type: nvidia.NvidiaGPUDevice, - Health: true, + ID: "device2", + Index: 1, + Count: 10, + Devmem: 8000, + Devcore: 100, + Numa: 0, + Type: nvidia.NvidiaGPUDevice, + Health: true, + DeviceVendor: nvidia.NvidiaGPUDevice, }, }, }) @@ -257,24 +259,26 @@ func Test_Filter(t *testing.T) { ID: "node2", Devices: []util.DeviceInfo{ { - ID: "device3", - Index: 0, - Count: 10, - Devmem: 8000, - Devcore: 100, - Numa: 0, - Type: nvidia.NvidiaGPUDevice, - Health: true, + ID: "device3", + Index: 0, + Count: 10, + Devmem: 8000, + Devcore: 100, + Numa: 0, + Type: nvidia.NvidiaGPUDevice, + Health: true, + DeviceVendor: nvidia.NvidiaGPUDevice, }, { - ID: "device4", - Index: 1, - Count: 10, - Devmem: 8000, - Devcore: 100, - Numa: 0, - Type: nvidia.NvidiaGPUDevice, - Health: true, + ID: "device4", + Index: 1, + Count: 10, + Devmem: 8000, + Devcore: 100, + Numa: 0, + Type: nvidia.NvidiaGPUDevice, + Health: true, + DeviceVendor: nvidia.NvidiaGPUDevice, }, }, }) diff --git a/pkg/util/types.go b/pkg/util/types.go index 410936bce..ce0c23a9a 100644 --- a/pkg/util/types.go +++ b/pkg/util/types.go @@ -124,14 +124,15 @@ type DeviceUsage struct { } type DeviceInfo struct { - ID string - Index uint - Count int32 - Devmem int32 - Devcore int32 - Type string - Numa int - Health bool + ID string + Index uint + Count int32 + Devmem int32 + Devcore int32 + Type string + Numa int + Health bool + DeviceVendor string } type NodeInfo struct { diff --git a/pkg/util/util.go b/pkg/util/util.go index 2d71742f2..3698d6190 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -344,16 +344,17 @@ func InitKlogFlags() *flag.FlagSet { return flagset } -func CheckHealth(devType string, n *corev1.Node) (bool, bool) { +func CheckHealth(devType string, n *corev1.Node) bool { handshake := n.Annotations[HandshakeAnnos[devType]] if strings.Contains(handshake, "Requesting") { formertime, _ := time.Parse("2006.01.02 15:04:05", strings.Split(handshake, "_")[1]) - return time.Now().Before(formertime.Add(time.Second * 60)), false + return time.Now().Before(formertime.Add(time.Second * 60)) } else if strings.Contains(handshake, "Deleted") { - return true, false - } else { - return true, true + return true + } else if strings.Contains(handshake, "Reported") { + return true } + return false } func MarkAnnotationsToDelete(devType string, nn string) error {