diff --git a/Makefile b/Makefile index 760bb55..9579f92 100644 --- a/Makefile +++ b/Makefile @@ -153,7 +153,7 @@ dev-run-debug: dev-scheduler-conf # # E2E test # -export E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=1m +export E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=90s export E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION=2s E2E_PAUSE_IMAGE=k8s.gcr.io/pause:3.2 E2E_KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig diff --git a/pkg/controllers/controllers_suite_test.go b/pkg/controllers/controllers_suite_test.go new file mode 100644 index 0000000..3b58908 --- /dev/null +++ b/pkg/controllers/controllers_suite_test.go @@ -0,0 +1,30 @@ +// Licensed to Shingo Omura under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Shingo Omura licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestControllers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Controllers Suite") +} diff --git a/pkg/controllers/reserved_resource_ammounts.go b/pkg/controllers/reserved_resource_amounts.go similarity index 87% rename from pkg/controllers/reserved_resource_ammounts.go rename to pkg/controllers/reserved_resource_amounts.go index a7b66c3..f033b6f 100644 --- a/pkg/controllers/reserved_resource_ammounts.go +++ b/pkg/controllers/reserved_resource_amounts.go @@ -19,6 +19,7 @@ package controllers import ( "strings" + "sync" schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" corev1 "k8s.io/api/core/v1" @@ -29,9 +30,14 @@ import ( ) type reservedResourceAmounts struct { - // [Cluster]Throttle Name -> podResourceAmountMap - cache map[types.NamespacedName]podResourceAmountMap + // This protects concurrent accesses to cache itself + sync.RWMutex + + // This protects concurrent accesses on the same (Cluster)Throttle's key keyMutex keymutex.KeyMutex + + // [Cluster]Throttle Name -> podResourceAmountMap + cache map[types.NamespacedName]podResourceAmountMap } func newReservedResourceAmounts(n int) *reservedResourceAmounts { @@ -41,18 +47,25 @@ func newReservedResourceAmounts(n int) *reservedResourceAmounts { } } +func (c *reservedResourceAmounts) getPodResourceAmountMap(nn types.NamespacedName) podResourceAmountMap { + c.Lock() + defer c.Unlock() + if _, ok := c.cache[nn]; !ok { + c.cache[nn] = podResourceAmountMap{} + } + return c.cache[nn] +} + func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Pod) bool { c.keyMutex.LockKey(nn.String()) defer func() { _ = c.keyMutex.UnlockKey(nn.String()) }() - if _, ok := c.cache[nn]; !ok { - c.cache[nn] = podResourceAmountMap{} - } + m := c.getPodResourceAmountMap(nn) + added := m.add(pod) - added := c.cache[nn].add(pod) - klog.V(5).InfoS("reservedResourceAmounts.addPod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Cache", c.cache) + klog.V(5).InfoS("reservedResourceAmounts.addPod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Cache", m) return added } @@ -62,12 +75,10 @@ func (c *reservedResourceAmounts) removePod(nn types.NamespacedName, pod *corev1 _ = c.keyMutex.UnlockKey(nn.String()) }() - if _, ok := c.cache[nn]; !ok { - c.cache[nn] = podResourceAmountMap{} - } + m := c.getPodResourceAmountMap(nn) + removed := m.remove(pod) - removed := c.cache[nn].remove(pod) - klog.V(5).InfoS("reservedResourceAmounts.removePod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Removed", removed, "Cache", c.cache) + klog.V(5).InfoS("reservedResourceAmounts.removePod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Removed", removed, "Cache", m) return removed } diff --git a/pkg/controllers/reserved_resource_amounts_test.go b/pkg/controllers/reserved_resource_amounts_test.go new file mode 100644 index 0000000..b26854c --- /dev/null +++ b/pkg/controllers/reserved_resource_amounts_test.go @@ -0,0 +1,97 @@ +// Licensed to Shingo Omura under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Shingo Omura licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "fmt" + "sync" + + . "github.com/onsi/ginkgo" + // . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("ReservedResourceAmounts", func() { + n := 2000 + pod := func(i int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{}, + }, + } + } + It("should be threadsafe across different throttle's namespacenames", func() { + r := newReservedResourceAmounts(1024) + add := &sync.WaitGroup{} + add.Add(n) + for i := 0; i < n; i++ { + go func(j int) { + r.addPod( + types.NamespacedName{Name: fmt.Sprintf("%d", j)}, + pod(j), + ) + add.Done() + }(i) + } + remove := &sync.WaitGroup{} + remove.Add(n) + for i := 0; i < n; i++ { + go func(j int) { + r.removePod( + types.NamespacedName{Name: fmt.Sprintf("%d", j)}, + pod(j), + ) + remove.Done() + }(i) + } + add.Wait() + remove.Wait() + }) + It("should be threadsafe on specific throttle's namespacedname", func() { + r := newReservedResourceAmounts(1024) + add := &sync.WaitGroup{} + add.Add(n) + for i := 0; i < n; i++ { + go func(j int) { + r.addPod( + types.NamespacedName{Name: "test"}, + pod(j), + ) + add.Done() + }(i) + } + remove := &sync.WaitGroup{} + remove.Add(n) + for i := 0; i < n; i++ { + go func(j int) { + r.removePod( + types.NamespacedName{Name: "test"}, + pod(j), + ) + remove.Done() + }(i) + } + add.Wait() + remove.Wait() + }) +})