Skip to content

Commit

Permalink
Add tests and renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jukie committed Oct 11, 2024
1 parent dd71f55 commit aa3ed23
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination"
controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype"
controllersdiscoveredcapacitycache "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/discoveredcapacitycache"
controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"

Expand Down Expand Up @@ -67,7 +67,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Sess
nodeclaimtagging.NewController(kubeClient, instanceProvider),
controllerspricing.NewController(pricingProvider),
controllersinstancetype.NewController(instanceTypeProvider),
controllersdiscoveredcapacitycache.NewController(kubeClient, instanceTypeProvider),
controllersinstancetypecapacity.NewController(kubeClient, instanceTypeProvider),
status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}
if options.FromContext(ctx).InterruptionQueue != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package discoveredcapacitycache
package capacity

import (
"context"
Expand Down Expand Up @@ -45,16 +45,16 @@ func NewController(kubeClient client.Client, instancetypeProvider *instancetype.
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "instancetypes.discoveredcapacitycache")
if err := c.instancetypeProvider.UpdateDiscoveredCapacityCache(ctx, c.kubeClient, node); err != nil {
ctx = injection.WithControllerName(ctx, "providers.instancetype.capacity")
if err := c.instancetypeProvider.UpdateInstanceTypeCapacityFromNode(ctx, c.kubeClient, node); err != nil {
return reconcile.Result{}, fmt.Errorf("updating discovered capacity cache, %w", err)
}
return reconcile.Result{}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("instancetypes.discoveredcapacitycache").
Named("providers.instancetype.capacity").
For(&corev1.Node{}, builder.WithPredicates(predicate.TypedFuncs[client.Object]{
// Only trigger reconciliation once a node becomes registered. This is an optimization to omit no-op reconciliations and reduce lock contention on the cache.
UpdateFunc: func(e event.TypedUpdateEvent[client.Object]) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package discoveredcapacitycache_test
package capacity_test

import (
"context"
"fmt"
controllersdiscoveredcapacitycache "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/discoveredcapacitycache"
controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -47,7 +47,7 @@ var ctx context.Context
var stop context.CancelFunc
var env *coretest.Environment
var awsEnv *test.Environment
var controller *controllersdiscoveredcapacitycache.Controller
var controller *controllersinstancetypecapacity.Controller

var nodeClass *v1.EC2NodeClass

Expand All @@ -64,7 +64,7 @@ var _ = BeforeSuite(func() {
ctx, stop = context.WithCancel(ctx)
awsEnv = test.NewEnvironment(ctx, env)
nodeClass = test.EC2NodeClass()
controller = controllersdiscoveredcapacitycache.NewController(env.Client, awsEnv.InstanceTypesProvider)
controller = controllersinstancetypecapacity.NewController(env.Client, awsEnv.InstanceTypesProvider)
})

var _ = AfterSuite(func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass)
// Compute hash key against node class AMIs (used to force cache rebuild when AMIs change)
amiHash, _ := hashstructure.Hash(nodeClass.Status.AMIs, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})

key := fmt.Sprintf("%d-%d-%016x-%s-%016x",
key := fmt.Sprintf("%d-%d-%016x-%016x-%016x",
p.instanceTypesSeqNum,
p.instanceTypesOfferingsSeqNum,
amiHash,
Expand Down Expand Up @@ -261,7 +261,7 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
return nil
}

func (p *DefaultProvider) UpdateDiscoveredCapacityCache(ctx context.Context, kubeClient client.Client, node *corev1.Node) error {
func (p *DefaultProvider) UpdateInstanceTypeCapacityFromNode(ctx context.Context, kubeClient client.Client, node *corev1.Node) error {
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, kubeClient, node)
if err != nil {
return fmt.Errorf("failed to get nodeclaim for node, %w", err)
Expand Down
116 changes: 115 additions & 1 deletion pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestAWS(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...))
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeClaimFieldIndexer(ctx)))
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options())
awsEnv = test.NewEnvironment(ctx, env)
Expand Down Expand Up @@ -109,12 +109,22 @@ var _ = AfterEach(func() {
})

var _ = Describe("InstanceTypeProvider", func() {
var node *corev1.Node
var nodeClaim *karpv1.NodeClaim
var nodeClass, windowsNodeClass *v1.EC2NodeClass
var nodePool, windowsNodePool *karpv1.NodePool
BeforeEach(func() {
nodeClass = test.EC2NodeClass(
v1.EC2NodeClass{
Status: v1.EC2NodeClassStatus{
AMIs: []v1.AMI{{
ID: "ami-test1",
Name: "ami-test-name",
Requirements: []corev1.NodeSelectorRequirement{
{Key: corev1.LabelArchStable, Operator: corev1.NodeSelectorOpIn, Values: []string{karpv1.ArchitectureAmd64}},
{Key: v1.LabelInstanceGPUCount, Operator: corev1.NodeSelectorOpExists},
},
}},
InstanceProfile: "test-profile",
SecurityGroups: []v1.SecurityGroup{
{
Expand Down Expand Up @@ -2399,6 +2409,110 @@ var _ = Describe("InstanceTypeProvider", func() {
uniqueInstanceTypeList(instanceTypeResult)
})
})
Context("Capacity Cache", func() {
BeforeEach(func() {
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{
VMMemoryOverheadPercent: lo.ToPtr[float64](0.075),
}))
awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{
InstanceTypes: []*ec2.InstanceTypeInfo{
{
InstanceType: aws.String("t3.medium"),
ProcessorInfo: &ec2.ProcessorInfo{
SupportedArchitectures: aws.StringSlice([]string{"x86_64"}),
},
VCpuInfo: &ec2.VCpuInfo{
DefaultCores: aws.Int64(1),
DefaultVCpus: aws.Int64(2),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(8192),
},
NetworkInfo: &ec2.NetworkInfo{
Ipv4AddressesPerInterface: aws.Int64(10),
DefaultNetworkCardIndex: aws.Int64(0),
NetworkCards: []*ec2.NetworkCardInfo{{
NetworkCardIndex: lo.ToPtr(int64(0)),
MaximumNetworkInterfaces: aws.Int64(3),
}},
},
SupportedUsageClasses: fake.DefaultSupportedUsageClasses,
},
},
})
awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{
InstanceTypeOfferings: []*ec2.InstanceTypeOffering{
{
InstanceType: aws.String("t3.medium"),
Location: aws.String("test-zone-1a"),
},
},
})
ExpectApplied(ctx, env.Client, nodeClass)
node = coretest.Node(coretest.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{
corev1.LabelInstanceTypeStable: "t3.medium",
karpv1.NodeRegisteredLabelKey: "true",
},
},
Capacity: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", 3840)),
},
})
ExpectApplied(ctx, env.Client, node)
nodeClaim = &karpv1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nodeclaim",
},
Spec: karpv1.NodeClaimSpec{
NodeClassRef: &karpv1.NodeClassReference{
Name: nodeClass.Name,
},
Requirements: make([]karpv1.NodeSelectorRequirementWithMinValues, 0),
},
Status: karpv1.NodeClaimStatus{
NodeName: node.Name,
ImageID: nodeClass.Status.AMIs[0].ID,
},
}
ExpectApplied(ctx, env.Client, nodeClaim)
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeCapacityFromNode(ctx, env.Client, node)).To(Succeed())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed())
})
It("should use capacity cache for previously seen instance-type and AMI", func() {
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass)
Expect(err).To(BeNil())
i, ok := lo.Find(instanceTypes, func(i *corecloudprovider.InstanceType) bool {
return i.Name == "t3.medium"
})
Expect(ok).To(BeTrue())
Expect(i.Capacity.Memory().Value()).To(Equal(node.Status.Capacity.Memory().Value()))
})
It("should use VM_MEMORY_OVERHEAD_PERCENT calculation after AMI update", func() {
// Trigger building cache for instance-types based on current AMI
_, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass)
Expect(err).To(BeNil())

// Update NodeClass AMI and re-list instance-types. Cached values from prior AMI should no longer be use.
nodeClass.Status.AMIs[0].ID = "ami-new-test-id"
ExpectApplied(ctx, env.Client, nodeClaim)
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeCapacityFromNode(ctx, env.Client, node)).To(Succeed())
instanceTypesNoCache, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass)
Expect(err).To(BeNil())
i, ok := lo.Find(instanceTypesNoCache, func(i *corecloudprovider.InstanceType) bool {
return i.Name == "t3.medium"
})
Expect(ok).To(BeTrue())

// Expect no cache hit and fallback to VM_MEMORY_OVERHEAD_PERCENT calculation
mem := resources.Quantity(fmt.Sprintf("%dMi", 8192))
mem.Sub(resource.MustParse(fmt.Sprintf("%dMi", int64(math.Ceil(float64(mem.Value())*options.FromContext(ctx).VMMemoryOverheadPercent/1024/1024)))))
Expect(i.Capacity.Memory().Value()).To(Equal(mem.Value()), "Expected capacity to match VMMemoryOverheadPercent calculation")
})
})
It("should not cause data races when calling List() simultaneously", func() {
mu := sync.RWMutex{}
var instanceTypeOrder []string
Expand Down
3 changes: 3 additions & 0 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Environment struct {
SecurityGroupCache *cache.Cache
InstanceProfileCache *cache.Cache
SSMCache *cache.Cache
DiscoveredCapacityCache *cache.Cache

// Providers
InstanceTypesResolver *instancetype.DefaultResolver
Expand Down Expand Up @@ -165,6 +166,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
InstanceProfileCache: instanceProfileCache,
UnavailableOfferingsCache: unavailableOfferingsCache,
SSMCache: ssmCache,
DiscoveredCapacityCache: discoveredCapacityCache,

InstanceTypesResolver: instanceTypesResolver,
InstanceTypesProvider: instanceTypesProvider,
Expand Down Expand Up @@ -200,6 +202,7 @@ func (env *Environment) Reset() {
env.SecurityGroupCache.Flush()
env.InstanceProfileCache.Flush()
env.SSMCache.Flush()
env.DiscoveredCapacityCache.Flush()
mfs, err := crmetrics.Registry.Gather()
if err != nil {
for _, mf := range mfs {
Expand Down

0 comments on commit aa3ed23

Please sign in to comment.