diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go index 1a2c36706..fc1181646 100644 --- a/pkg/metrics/queue.go +++ b/pkg/metrics/queue.go @@ -23,6 +23,8 @@ import ( dto "github.com/prometheus/client_model/go" "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-core/pkg/locking" "github.com/apache/yunikorn-core/pkg/log" ) @@ -57,6 +59,9 @@ type QueueMetrics struct { resourceMetricsLabel *prometheus.GaugeVec // Deprecated - To be removed in 1.7.0. Replaced with queue label Metrics resourceMetricsSubsystem *prometheus.GaugeVec + // Track known resource types + knownResourceTypes map[string]struct{} + lock locking.Mutex } // InitQueueMetrics to initialize queue metrics @@ -123,6 +128,7 @@ func InitQueueMetrics(name string) *QueueMetrics { } } + q.knownResourceTypes = make(map[string]struct{}) return q } @@ -142,10 +148,13 @@ func (m *QueueMetrics) setQueueResource(state string, resourceName string, value } func (m *QueueMetrics) Reset() { + m.lock.Lock() + defer m.lock.Unlock() m.appMetricsLabel.Reset() m.appMetricsSubsystem.Reset() m.resourceMetricsLabel.Reset() m.resourceMetricsSubsystem.Reset() + m.knownResourceTypes = make(map[string]struct{}) } func (m *QueueMetrics) IncQueueApplicationsRunning() { @@ -301,12 +310,22 @@ func (m *QueueMetrics) AddReleasedContainers(value int) { m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value)) } -func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, value float64) { - m.setQueueResource(QueueGuaranteed, resourceName, value) -} +func (m *QueueMetrics) UpdateQueueResourceMetrics(state string, newResources map[string]resources.Quantity) { + m.lock.Lock() + defer m.lock.Unlock() + // Iterate over new resource types and set their values + for resourceName, value := range newResources { + m.setQueueResource(state, resourceName, float64(value)) + // Add new resources to the known list + m.knownResourceTypes[resourceName] = struct{}{} + } -func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value float64) { - m.setQueueResource(QueueMax, resourceName, value) + // Emit old resource types that are missing in the new collection with zero + for resourceName := range m.knownResourceTypes { + if _, exists := newResources[resourceName]; !exists { + m.setQueueResource(state, resourceName, float64(0)) + } + } } func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, value float64) { diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go index ea2376bd8..9a9678645 100644 --- a/pkg/metrics/queue_test.go +++ b/pkg/metrics/queue_test.go @@ -26,6 +26,8 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + + "github.com/apache/yunikorn-core/pkg/common/resources" ) var qm *QueueMetrics @@ -196,16 +198,28 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) { qm = getQueueMetrics() defer unregisterQueueMetrics() - qm.SetQueueGuaranteedResourceMetrics("cpu", 1) + qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{ + "cpu": 1, + }) verifyResourceMetrics(t, "guaranteed", "cpu") + assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}}) + + qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{"memory": 1}) + assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}}) } func TestQueueMaxResourceMetrics(t *testing.T) { qm = getQueueMetrics() defer unregisterQueueMetrics() - qm.SetQueueMaxResourceMetrics("cpu", 1) + qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{ + "cpu": 1, + }) verifyResourceMetrics(t, "max", "cpu") + assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}}) + + qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{"memory": 1}) + assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}}) } func TestQueueAllocatedResourceMetrics(t *testing.T) { @@ -364,4 +378,5 @@ func unregisterQueueMetrics() { prometheus.Unregister(qm.containerMetrics) prometheus.Unregister(qm.resourceMetricsLabel) prometheus.Unregister(qm.resourceMetricsSubsystem) + qm.knownResourceTypes = make(map[string]struct{}) } diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 7e21bc8f7..6ac9dc16e 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1657,20 +1657,22 @@ func (sq *Queue) SupportTaskGroup() bool { // updateGuaranteedResourceMetrics updates guaranteed resource metrics. func (sq *Queue) updateGuaranteedResourceMetrics() { + queueMetrics := metrics.GetQueueMetrics(sq.QueuePath) + resourcesToUpdate := map[string]resources.Quantity{} if sq.guaranteedResource != nil { - for k, v := range sq.guaranteedResource.Resources { - metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, float64(v)) - } + resourcesToUpdate = sq.guaranteedResource.Resources } + queueMetrics.UpdateQueueResourceMetrics(metrics.QueueGuaranteed, resourcesToUpdate) } // updateMaxResourceMetrics updates max resource metrics. func (sq *Queue) updateMaxResourceMetrics() { + queueMetrics := metrics.GetQueueMetrics(sq.QueuePath) + resourcesToUpdate := map[string]resources.Quantity{} if sq.maxResource != nil { - for k, v := range sq.maxResource.Resources { - metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v)) - } + resourcesToUpdate = sq.maxResource.Resources } + queueMetrics.UpdateQueueResourceMetrics(metrics.QueueMax, resourcesToUpdate) } // updateAllocatedResourceMetrics updates allocated resource metrics for all queue types.