diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index a6840854b..7df5c1e4f 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -52,7 +52,6 @@ var ( terminatedTimeout = 3 * 24 * time.Hour defaultPlaceholderTimeout = 15 * time.Minute ) - var initAppLogOnce sync.Once var rateLimitedAppLog *log.RateLimitedLogger @@ -191,15 +190,9 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve app.rmID = rmID app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem()) app.appEvents.SendNewApplicationEvent(app.ApplicationID) - app.setNewMetrics() return app } -func (sa *Application) setNewMetrics() { - metrics.GetSchedulerMetrics().IncTotalApplicationsNew() - metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew() -} - func (sa *Application) String() string { if sa == nil { return "application is nil" @@ -1641,6 +1634,9 @@ func (sa *Application) SetQueue(queue *Queue) { defer sa.Unlock() sa.queuePath = queue.QueuePath sa.queue = queue + // here we can make sure the queue is not empty + metrics.GetQueueMetrics(queue.QueuePath).IncQueueApplicationsNew() + metrics.GetSchedulerMetrics().IncTotalApplicationsNew() } // remove the leaf queue the application runs in, used when completing the app diff --git a/pkg/scheduler/objects/application_state_test.go b/pkg/scheduler/objects/application_state_test.go index 33c218ffd..16b811443 100644 --- a/pkg/scheduler/objects/application_state_test.go +++ b/pkg/scheduler/objects/application_state_test.go @@ -286,6 +286,7 @@ func TestAppStateTransitionEvents(t *testing.T) { func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen queue := createQueue(t, "metrics") metrics.GetSchedulerMetrics().Reset() + metrics.GetQueueMetrics("root.metrics").Reset() // app-00001: New -> Resuming -> Accepted --> Running -> Completing-> Completed app := newApplication("app-00001", "default", "root.metrics") app.SetQueue(queue) @@ -475,6 +476,12 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertQueueApplicationsRejectedMetrics(t, app, 1) assertQueueApplicationsFailedMetrics(t, app, 2) assertQueueApplicationsCompletedMetrics(t, app, 1) + + // app-00005: the queuePath is empty, it will happen for dynamic queue when it before the queue is created + app = newApplication("app-00005", "default", "") + assertState(t, app, nil, New.String()) + assertQueueApplicationsNewMetrics(t, app, 0) + assertTotalAppsNewMetrics(t, 4) } func assertState(t testing.TB, app *Application, err error, expected string) { diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index b47fb2343..ed0365029 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/common/security" "github.com/apache/yunikorn-core/pkg/events" + "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/mock" "github.com/apache/yunikorn-core/pkg/plugins" "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" @@ -246,6 +247,9 @@ func TestRemoveNodeWithAllocations(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app app := newApplication(appID1, "default", defQueue) err = partition.AddApplication(app) @@ -293,6 +297,9 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app app := newApplication(appID1, "default", defQueue) err = partition.AddApplication(app) @@ -395,6 +402,9 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app1 app1, _ := newApplicationWithHandler(appID1, "default", defQueue) err = partition.AddApplication(app1) @@ -522,6 +532,9 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app1 app1, _ := newApplicationWithHandler(appID1, "default", defQueue) err = partition.AddApplication(app1) @@ -605,6 +618,9 @@ func TestPlaceholderDataWithRemoval(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app1 app1, _ := newApplicationWithHandler(appID1, "default", defQueue) err = partition.AddApplication(app1) @@ -698,6 +714,8 @@ func TestRemoveNodeWithReplacement(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() // add a new app app := newApplication(appID1, "default", defQueue) err = partition.AddApplication(app) @@ -770,6 +788,9 @@ func TestRemoveNodeWithReal(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() + // add a new app app := newApplication(appID1, "default", defQueue) err = partition.AddApplication(app) @@ -831,6 +852,8 @@ func TestRemoveNodeWithReal(t *testing.T) { } func TestAddApp(t *testing.T) { + defer metrics.GetSchedulerMetrics().Reset() + defer metrics.GetQueueMetrics(defQueue).Reset() partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") @@ -838,6 +861,13 @@ func TestAddApp(t *testing.T) { app := newApplication(appID1, "default", defQueue) err = partition.AddApplication(app) assert.NilError(t, err, "add application to partition should not have failed") + queueApplicationsNew, err := metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew() + assert.NilError(t, err, "get queue metrics failed") + assert.Equal(t, queueApplicationsNew, 1) + scheduleApplicationsNew, err := metrics.GetSchedulerMetrics().GetTotalApplicationsNew() + assert.NilError(t, err, "get scheduler metrics failed") + assert.Equal(t, scheduleApplicationsNew, 1) + // add the same app err = partition.AddApplication(app) if err == nil { @@ -853,6 +883,12 @@ func TestAddApp(t *testing.T) { if err == nil || partition.getApplication(appID2) != nil { t.Errorf("add application on stopped partition should have failed but did not") } + queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew() + assert.NilError(t, err, "get queue metrics failed") + assert.Equal(t, queueApplicationsNew, 1) + scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() + assert.NilError(t, err, "get scheduler metrics failed") + assert.Equal(t, scheduleApplicationsNew, 1) // mark partition for deletion, no new application can be added partition.stateMachine.SetState(objects.Active.String()) @@ -863,6 +899,12 @@ func TestAddApp(t *testing.T) { if err == nil || partition.getApplication(appID3) != nil { t.Errorf("add application on draining partition should have failed but did not") } + queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew() + assert.NilError(t, err, "get queue metrics failed") + assert.Equal(t, queueApplicationsNew, 1) + scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() + assert.NilError(t, err, "get scheduler metrics failed") + assert.Equal(t, scheduleApplicationsNew, 1) } func TestAddAppForced(t *testing.T) {