Skip to content

Commit

Permalink
Add instance type and instance type offering generator
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Sep 29, 2024
1 parent 8676ebf commit ecb969b
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 222 deletions.
3 changes: 0 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ limitations under the License.
package main

import (
"github.com/samber/lo"

"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers"
"github.com/aws/karpenter-provider-aws/pkg/operator"
Expand All @@ -37,7 +35,6 @@ func main() {
op.AMIProvider,
op.SecurityGroupProvider,
)
lo.Must0(op.AddHealthzCheck("cloud-provider", awsCloudProvider.LivenessProbe))
cloudProvider := metrics.Decorate(awsCloudProvider)

op.
Expand Down
14 changes: 8 additions & 6 deletions hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ below are the resources available with some assumptions and after the instance o
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
awscache.NewUnavailableOfferings(),
pricing.NewDefaultProvider(
ctx,
pricing.NewAPI(sess, *sess.Config.Region),
ec2api,
*sess.Config.Region,
instancetype.NewDefaultGenerator(
pricing.NewDefaultProvider(
ctx,
pricing.NewAPI(sess, *sess.Config.Region),
ec2api,
*sess.Config.Region,
),
awscache.NewUnavailableOfferings(),
),
)
if err = instanceTypeProvider.UpdateInstanceTypes(ctx); err != nil {
Expand Down
5 changes: 0 additions & 5 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
stderrors "errors"
"fmt"
"net/http"
"time"

"github.com/aws/aws-sdk-go/service/ec2"
Expand Down Expand Up @@ -159,10 +158,6 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
return c.instanceToNodeClaim(instance, instanceType, nc), nil
}

func (c *CloudProvider) LivenessProbe(req *http.Request) error {
return c.instanceTypeProvider.LivenessProbe(req)
}

// GetInstanceTypes returns all available InstanceTypes
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *karpv1.NodePool) ([]*cloudprovider.InstanceType, error) {
nodeClass, err := c.resolveNodeClassFromNodePool(ctx, nodePool)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider,
securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider,
pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller {
pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider *instancetype.DefaultProvider) []controller.Controller {

controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/providers/instancetype/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ import (
)

type Controller struct {
instancetypeProvider instancetype.Provider
instanceTypeProvider *instancetype.DefaultProvider
}

func NewController(instancetypeProvider instancetype.Provider) *Controller {
func NewController(instanceTypeProvider *instancetype.DefaultProvider) *Controller {
return &Controller{
instancetypeProvider: instancetypeProvider,
instanceTypeProvider: instanceTypeProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "providers.instancetype")

work := []func(ctx context.Context) error{
c.instancetypeProvider.UpdateInstanceTypes,
c.instancetypeProvider.UpdateInstanceTypeOfferings,
c.instanceTypeProvider.UpdateInstanceTypes,
c.instanceTypeProvider.UpdateInstanceTypeOfferings,
}
errs := make([]error, len(work))
lop.ForEach(work, func(f func(ctx context.Context) error, i int) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Operator struct {
LaunchTemplateProvider launchtemplate.Provider
PricingProvider pricing.Provider
VersionProvider version.Provider
InstanceTypesProvider instancetype.Provider
InstanceTypesProvider *instancetype.DefaultProvider
InstanceProvider instance.Provider
SSMProvider ssmp.Provider
}
Expand Down Expand Up @@ -162,8 +162,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
unavailableOfferingsCache,
pricingProvider,
instancetype.NewDefaultGenerator(pricingProvider, unavailableOfferingsCache),
)
instanceProvider := instance.NewDefaultProvider(
ctx,
Expand Down
185 changes: 54 additions & 131 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,39 @@ package instancetype
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"

"github.com/awslabs/operatorpkg/option"
"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"

karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/scheduling"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/pricing"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"

"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
)

type Provider interface {
LivenessProbe(*http.Request) error
List(context.Context, *v1.KubeletConfiguration, *v1.EC2NodeClass) ([]*cloudprovider.InstanceType, error)
UpdateInstanceTypes(ctx context.Context) error
UpdateInstanceTypeOfferings(ctx context.Context) error
}

type defaultProviderOptions struct {
instanceTypeFilterMap func(*cloudprovider.InstanceType, *v1.EC2NodeClass) (*cloudprovider.InstanceType, bool)
}

var WithInstanceTypeFilterMap = func(instanceTypeFilterMap func(*cloudprovider.InstanceType, *v1.EC2NodeClass) (*cloudprovider.InstanceType, bool)) func(*defaultProviderOptions) {
return func(opts *defaultProviderOptions) {
opts.instanceTypeFilterMap = instanceTypeFilterMap
}
}

type DefaultProvider struct {
defaultProviderOptions

region string
ec2api ec2iface.EC2API
subnetProvider subnet.Provider
pricingProvider pricing.Provider
region string
ec2api ec2iface.EC2API
subnetProvider subnet.Provider
instanceTypeGenerator Generator

// Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache.
// Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache,
Expand All @@ -85,39 +63,28 @@ type DefaultProvider struct {
instanceTypeOfferings map[string]sets.Set[string]

instanceTypesCache *cache.Cache

unavailableOfferings *awscache.UnavailableOfferings
cm *pretty.ChangeMonitor
cm *pretty.ChangeMonitor
// instanceTypesSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypesSeqNum uint64
// instanceTypeOfferingsSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypeOfferingsSeqNum uint64
}

func NewDefaultProvider(region string, instanceTypesCache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider,
unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider pricing.Provider, opts ...option.Function[defaultProviderOptions]) *DefaultProvider {
resolvedOpts := *option.Resolve(opts...)
if resolvedOpts.instanceTypeFilterMap == nil {
resolvedOpts.instanceTypeFilterMap = func(it *cloudprovider.InstanceType, _ *v1.EC2NodeClass) (*cloudprovider.InstanceType, bool) {
return it, true
}
}
func NewDefaultProvider(region string, instanceTypesCache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, instanceTypesGenerator Generator) *DefaultProvider {
return &DefaultProvider{

defaultProviderOptions: resolvedOpts,
ec2api: ec2api,
region: region,
subnetProvider: subnetProvider,
pricingProvider: pricingProvider,
instanceTypesInfo: []*ec2.InstanceTypeInfo{},
instanceTypeOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
unavailableOfferings: unavailableOfferingsCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
ec2api: ec2api,
region: region,
subnetProvider: subnetProvider,
instanceTypesInfo: []*ec2.InstanceTypeInfo{},
instanceTypeOfferings: map[string]sets.Set[string]{},
instanceTypeGenerator: instanceTypesGenerator,
instanceTypesCache: instanceTypesCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
}
}

// nolint:gocyclo
func (p *DefaultProvider) List(ctx context.Context, kc *v1.KubeletConfiguration, nodeClass *v1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) {
p.muInstanceTypeInfo.RLock()
p.muInstanceTypeOfferings.RLock()
Expand Down Expand Up @@ -145,15 +112,15 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1.KubeletConfiguration,
subnetZonesHash, _ := hashstructure.Hash(subnetZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
blockDeviceMappingsHash, _ := hashstructure.Hash(nodeClass.Spec.BlockDeviceMappings, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%d-%016x-%016x-%016x-%s-%s",
key := fmt.Sprintf("%d-%d-%016x-%016x-%016x-%s-%s-%s",
p.instanceTypesSeqNum,
p.instanceTypeOfferingsSeqNum,
p.unavailableOfferings.SeqNum,
subnetZonesHash,
kcHash,
blockDeviceMappingsHash,
lo.FromPtr((*string)(nodeClass.Spec.InstanceStorePolicy)),
nodeClass.AMIFamily(),
p.instanceTypeGenerator.Key(),
)
if item, ok := p.instanceTypesCache.Get(key); ok {
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
Expand All @@ -172,36 +139,54 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1.KubeletConfiguration,
if p.cm.HasChanged("zones", allZones) {
log.FromContext(ctx).WithValues("zones", allZones.UnsortedList()).V(1).Info("discovered zones")
}
amiFamily := amifamily.GetAMIFamily(nodeClass.AMIFamily(), &amifamily.Options{})
result := lo.FilterMap(p.instanceTypesInfo, func(i *ec2.InstanceTypeInfo, _ int) (*cloudprovider.InstanceType, bool) {
subnetZoneToID := lo.SliceToMap(nodeClass.Status.Subnets, func(s v1.Subnet) (string, string) {
return s.Zone, s.ZoneID
})
result := lo.Map(p.instanceTypesInfo, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
instanceTypeVCPU.With(prometheus.Labels{
instanceTypeLabel: *i.InstanceType,
}).Set(float64(aws.Int64Value(i.VCpuInfo.DefaultVCpus)))
instanceTypeMemory.With(prometheus.Labels{
instanceTypeLabel: *i.InstanceType,
}).Set(float64(aws.Int64Value(i.MemoryInfo.SizeInMiB) * 1024 * 1024))

zoneData := lo.Map(allZones.UnsortedList(), func(zoneName string, _ int) ZoneData {
if !p.instanceTypeOfferings[aws.StringValue(i.InstanceType)].Has(zoneName) || !subnetZones.Has(zoneName) {
return ZoneData{
Name: zoneName,
Available: false,
}
}
return ZoneData{
Name: zoneName,
ID: subnetZoneToID[zoneName],
Available: true,
}
})

// !!! Important !!!
// Any changes to the values passed into the NewInstanceType method will require making updates to the cache key
// Any changes to the values passed into the Generate method will require making updates to the cache key
// so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types
// !!! Important !!!
return p.instanceTypeFilterMap(NewInstanceType(ctx, i, p.region,
nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy,
kc.MaxPods, kc.PodsPerCore, kc.KubeReserved, kc.SystemReserved, kc.EvictionHard, kc.EvictionSoft,
amiFamily, p.createOfferings(ctx, i, allZones, p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], nodeClass.Status.Subnets),
), nodeClass)
it := p.instanceTypeGenerator.Generate(ctx, p.region, i, zoneData, nodeClass.AMIFamily(), nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy, kc)
for _, of := range it.Offerings {
instanceTypeOfferingAvailable.With(prometheus.Labels{
instanceTypeLabel: it.Name,
capacityTypeLabel: of.Requirements.Get(karpv1.CapacityTypeLabelKey).Any(),
zoneLabel: of.Requirements.Get(corev1.LabelTopologyZone).Any(),
}).Set(float64(lo.Ternary(of.Available, 1, 0)))
instanceTypeOfferingPriceEstimate.With(prometheus.Labels{
instanceTypeLabel: it.Name,
capacityTypeLabel: of.Requirements.Get(karpv1.CapacityTypeLabelKey).Any(),
zoneLabel: of.Requirements.Get(corev1.LabelTopologyZone).Any(),
}).Set(of.Price)
}
return it
})
p.instanceTypesCache.SetDefault(key, result)
return result, nil
}

func (p *DefaultProvider) LivenessProbe(req *http.Request) error {
if err := p.subnetProvider.LivenessProbe(req); err != nil {
return err
}
return p.pricingProvider.LivenessProbe(req)
}

func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
Expand Down Expand Up @@ -273,68 +258,6 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
return nil
}

// createOfferings creates a set of mutually exclusive offerings for a given instance type. This provider maintains an
// invariant that each offering is mutually exclusive. Specifically, there is an offering for each permutation of zone
// and capacity type. ZoneID is also injected into the offering requirements, when available, but there is a 1-1
// mapping between zone and zoneID so this does not change the number of offerings.
//
// Each requirement on the offering is guaranteed to have a single value. To get the value for a requirement on an
// offering, you can do the following thanks to this invariant:
//
// offering.Requirements.Get(v1.TopologyLabelZone).Any()
func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones, instanceTypeZones sets.Set[string], subnets []v1.Subnet) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for zone := range zones {
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType)
var price float64
var ok bool
switch capacityType {
case ec2.UsageClassTypeSpot:
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone)
case ec2.UsageClassTypeOnDemand:
price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType)
case "capacity-block":
// ignore since karpenter doesn't support it yet, but do not log an unknown capacity type error
continue
default:
log.FromContext(ctx).WithValues("capacity-type", capacityType, "instance-type", *instanceType.InstanceType).Error(fmt.Errorf("received unknown capacity type"), "failed parsing offering")
continue
}

subnet, hasSubnet := lo.Find(subnets, func(s v1.Subnet) bool {
return s.Zone == zone
})
available := !isUnavailable && ok && instanceTypeZones.Has(zone) && hasSubnet
offering := cloudprovider.Offering{
Requirements: scheduling.NewRequirements(
scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, capacityType),
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, zone),
),
Price: price,
Available: available,
}
if subnet.ZoneID != "" {
offering.Requirements.Add(scheduling.NewRequirement(v1.LabelTopologyZoneID, corev1.NodeSelectorOpIn, subnet.ZoneID))
}
offerings = append(offerings, offering)
instanceTypeOfferingAvailable.With(prometheus.Labels{
instanceTypeLabel: *instanceType.InstanceType,
capacityTypeLabel: capacityType,
zoneLabel: zone,
}).Set(float64(lo.Ternary(available, 1, 0)))
instanceTypeOfferingPriceEstimate.With(prometheus.Labels{
instanceTypeLabel: *instanceType.InstanceType,
capacityTypeLabel: capacityType,
zoneLabel: zone,
}).Set(price)
}
}
return offerings
}

func (p *DefaultProvider) Reset() {
p.instanceTypesInfo = []*ec2.InstanceTypeInfo{}
p.instanceTypeOfferings = map[string]sets.Set[string]{}
Expand Down
Loading

0 comments on commit ecb969b

Please sign in to comment.