Skip to content

Commit

Permalink
Fix crash in podgroup when runLauncherAsWorker is true (#669)
Browse files Browse the repository at this point in the history
* Fix crash in podgroup when runLauncherAsWorker is true

Signed-off-by: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com>

* Address comments

Signed-off-by: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com>

---------

Signed-off-by: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com>
  • Loading branch information
GonzaloSaez authored Jan 16, 2025
1 parent d0ab239 commit 7f94988
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,12 @@ func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedu

sort.Sort(sort.Reverse(order))
// Launcher + Worker > minMember
if minMember != nil && *order[0].Replicas+*order[1].Replicas > *minMember {
replicas := *order[0].Replicas
if len(order) > 1 {
// When using runLauncherAsWorker, there may be no worker.
replicas += *order[1].Replicas
}
if minMember != nil && replicas > *minMember {
// If the launcher and workers have the same priority, it treats workers as a lower priority.
if order[0].priority == order[1].priority {
wIndex := order.getWorkerIndex()
Expand Down
105 changes: 105 additions & 0 deletions pkg/controller/podgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ var (
corev1.ResourceMemory: resource.MustParse("512Gi"),
"example.com/gpu": resource.MustParse("40"),
}

minResourcesNoMinMember = &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
}
)

func TestNewPodGroup(t *testing.T) {
Expand Down Expand Up @@ -208,6 +213,73 @@ func TestNewPodGroup(t *testing.T) {
},
},
},
"no worker no MinResources": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Annotations: map[string]string{
volcanov1beta1.QueueNameAnnotationKey: "project-x",
},
},
Spec: kubeflow.MPIJobSpec{
RunLauncherAsWorker: ptr.To[bool](true),
RunPolicy: kubeflow.RunPolicy{
SchedulingPolicy: &kubeflow.SchedulingPolicy{
MinAvailable: ptr.To[int32](1),
Queue: "project-y",
PriorityClass: "high",
ScheduleTimeoutSeconds: ptr.To[int32](100),
},
},
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: {
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
}},
},
},
},
},
},
},
wantVolcanoPG: &volcanov1beta1.PodGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: volcanov1beta1.SchemeGroupVersion.String(),
Kind: "PodGroup",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: volcanov1beta1.PodGroupSpec{
MinMember: 1,
Queue: "project-y",
PriorityClassName: "high",
MinResources: minResourcesNoMinMember,
},
},
wantSchedPG: &schedv1alpha1.PodGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: schedv1alpha1.SchemeGroupVersion.String(),
Kind: "PodGroup",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: schedv1alpha1.PodGroupSpec{
MinMember: 1,
MinResources: *minResourcesNoMinMember,
ScheduleTimeoutSeconds: ptr.To[int32](100),
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -447,6 +519,39 @@ func TestCalculatePGMinResources(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("65Gi"),
},
},
"without worker without priorityClass": {
minMember: 1,
job: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: kubeflow.MPIJobSpec{
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: {
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
},
},
},
},
},
},
},
want: &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
}
for name, tc := range volcanoTests {
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit 7f94988

Please sign in to comment.