diff --git a/config/config.go b/config/config.go index 3de9251b93..3f7cf0963a 100644 --- a/config/config.go +++ b/config/config.go @@ -285,6 +285,8 @@ type Config struct { OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` OpenPolicyAgentMaxRequestBodySize int64 `yaml:"open-policy-agent-max-request-body-size"` OpenPolicyAgentMaxMemoryBodyParsing int64 `yaml:"open-policy-agent-max-memory-body-parsing"` + + PassiveHealthCheck mapFlags `yaml:"passive-health-check"` } const ( @@ -571,6 +573,9 @@ func NewConfig() *Config { flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use . to selectively enable module symbols, for example: package,base._G,base.print,json") flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`) + // Passive Health Checks + flag.Var(&cfg.PassiveHealthCheck, "passive-health-check", "sets the parameters for passive health check feature") + cfg.flags = flag return cfg } @@ -912,6 +917,8 @@ func (c *Config) ToOptions() skipper.Options { OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, OpenPolicyAgentMaxRequestBodySize: c.OpenPolicyAgentMaxRequestBodySize, OpenPolicyAgentMaxMemoryBodyParsing: c.OpenPolicyAgentMaxMemoryBodyParsing, + + PassiveHealthCheck: c.PassiveHealthCheck.values, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/docs/operation/operation.md b/docs/operation/operation.md index cf327d2fb5..b530f1f339 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -893,6 +893,34 @@ to get the results paginated or getting all of them at the same time. curl localhost:9911/routes?offset=200&limit=100 ``` +## Passive health check (*experimental*) + +Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called +Passive Health Check(PHC). + +PHC works the following way: the entire uptime is divided in chunks of `period`, per every period Skipper calculates +the total amount of requests and amount of requests failed per every endpoint. While next period is going on, +the Skipper takes a look at previous period and if the amount of requests in the previous period is more than `min-requests` +for the given endpoints then Skipper will send reduced (the more `max-drop-probability` +and failed requests ratio in previous period are, the stronger reduction is) +amount of requests compared to amount sent without PHC. + +Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead. + +To enable this feature, you need to provide `-passive-health-check` option having all forementioned parameters +(`period`, `min-requests`, `max-drop-probability`) defined, +for instance: `-passive-health-check=period=1s,min-requests=10,max-drop-probability=0.9`. + +You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially. + +However, Skipper will run without this feature, if no `-passive-health-check` is provided at all. + +The parameters of `-passive-health-check` option are: ++ `period=` - the duration of stats reset period ++ `min-requests=` - the minimum number of requests per `period` per backend endpoint required to activate PHC for this endpoint ++ `max-drop-probabilty=` - the maximum possible probability of unhealthy endpoint being not considered +while choosing the endpoint for the given request + ## Memory consumption While Skipper is generally not memory bound, some features may require diff --git a/filters/fadein/fadein_test.go b/filters/fadein/fadein_test.go index f3c9b9ab93..f03347a17c 100644 --- a/filters/fadein/fadein_test.go +++ b/filters/fadein/fadein_test.go @@ -236,6 +236,7 @@ func TestPostProcessor(t *testing.T) { ` endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() rt, _ := createRouting(t, routes, endpointRegistry) foo := route(rt, "/foo") @@ -266,6 +267,7 @@ func TestPostProcessor(t *testing.T) { ` endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() rt, _ := createRouting(t, routes, endpointRegistry) r := route(rt, "/") if r != nil { @@ -278,13 +280,14 @@ func TestPostProcessor(t *testing.T) { * -> fadeIn("-1m") -> <"http://10.0.0.1:8080"> ` - endpointRegisty := routing.NewEndpointRegistry(routing.RegistryOptions{}) - rt, _ := createRouting(t, routes, endpointRegisty) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() + rt, _ := createRouting(t, routes, endpointRegistry) r := route(rt, "/") if r == nil || len(r.LBEndpoints) == 0 { t.Fatal("failed to ignore negative duration") } - if endpointRegisty.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() { + if endpointRegistry.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() { t.Fatal("failed to ignore negative duration") } }) @@ -295,6 +298,7 @@ func TestPostProcessor(t *testing.T) { ` endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() rt, update := createRouting(t, routes, endpointRegistry) firstDetected := time.Now() @@ -327,6 +331,7 @@ func TestPostProcessor(t *testing.T) { ` endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() rt, update := createRouting(t, initialRoutes, endpointRegistry) firstDetected := time.Now() @@ -362,6 +367,7 @@ func TestPostProcessor(t *testing.T) { const lastSeenTimeout = 2 * time.Second endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{LastSeenTimeout: lastSeenTimeout}) + defer endpointRegistry.Close() rt, update := createRouting(t, initialRoutes, endpointRegistry) firstDetected := time.Now() @@ -397,6 +403,7 @@ func TestPostProcessor(t *testing.T) { ` endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() routes := fmt.Sprintf(routesFmt, nows(t)) rt, update := createRouting(t, routes, endpointRegistry) firstDetected := time.Now() diff --git a/loadbalancer/algorithm_test.go b/loadbalancer/algorithm_test.go index d33ba693ed..e48986c9ee 100644 --- a/loadbalancer/algorithm_test.go +++ b/loadbalancer/algorithm_test.go @@ -31,6 +31,7 @@ func TestSelectAlgorithm(t *testing.T) { t.Run("LB route with default algorithm", func(t *testing.T) { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -59,6 +60,7 @@ func TestSelectAlgorithm(t *testing.T) { t.Run("LB route with explicit round-robin algorithm", func(t *testing.T) { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -88,6 +90,7 @@ func TestSelectAlgorithm(t *testing.T) { t.Run("LB route with explicit consistentHash algorithm", func(t *testing.T) { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -117,6 +120,7 @@ func TestSelectAlgorithm(t *testing.T) { t.Run("LB route with explicit random algorithm", func(t *testing.T) { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -146,6 +150,7 @@ func TestSelectAlgorithm(t *testing.T) { t.Run("LB route with explicit powerOfRandomNChoices algorithm", func(t *testing.T) { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -260,6 +265,7 @@ func TestApply(t *testing.T) { req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil) p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -293,6 +299,7 @@ func TestConsistentHashSearch(t *testing.T) { apply := func(key string, endpoints []string) string { p := NewAlgorithmProvider() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -349,6 +356,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) { Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25}, } endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() endpointRegistry.Do([]*routing.Route{route}) noLoad := ch.Apply(ctx) nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}}) @@ -429,6 +437,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) { Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor}, } endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() endpointRegistry.Do([]*routing.Route{route}) for i := 0; i < 100; i++ { diff --git a/metricsinit_test.go b/metricsinit_test.go index d2a94e8223..ff9740254e 100644 --- a/metricsinit_test.go +++ b/metricsinit_test.go @@ -53,6 +53,11 @@ func TestInitOrderAndDefault(t *testing.T) { SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)}, EnableRatelimiters: true, SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod, + PassiveHealthCheck: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "0.9", + }, } tornDown := make(chan struct{}) diff --git a/proxy/fadein_internal_test.go b/proxy/fadein_internal_test.go index 8b073b715a..8ebf92a9f4 100644 --- a/proxy/fadein_internal_test.go +++ b/proxy/fadein_internal_test.go @@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) } - proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}} + proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}, quit: make(chan struct{})} return route, proxy, eps } @@ -103,6 +103,7 @@ func calculateFadeInDuration(t *testing.T, algorithmName string, endpointAges [] const precalculateRatio = 10 route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDuration) + defer proxy.Close() rnd := rand.New(rand.NewSource(time.Now().UnixNano())) t.Log("preemulation start", time.Now()) @@ -125,6 +126,7 @@ func testFadeInMonotony( t.Run(name, func(t *testing.T) { fadeInDuration := calculateFadeInDuration(t, algorithmName, endpointAges) route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, fadeInDuration) + defer proxy.Close() t.Log("test start", time.Now()) var stats []string @@ -273,6 +275,7 @@ func testFadeInLoadBetweenOldAndNewEps( } route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + defer proxy.Close() rnd := rand.New(rand.NewSource(time.Now().UnixNano())) nReqs := map[string]int{} @@ -330,6 +333,7 @@ func testSelectEndpointEndsWhenAllEndpointsAreFading( // Initialize every endpoint with zero: every endpoint is new endpointAges := make([]float64, nEndpoints) route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + defer proxy.Close() applied := make(chan struct{}) go func() { @@ -364,6 +368,7 @@ func benchmarkFadeIn( ) { b.Run(name, func(b *testing.B) { route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + defer proxy.Close() var wg sync.WaitGroup // Emulate the load balancer loop, sending requests to it with random hash keys diff --git a/proxy/healthy_endpoints.go b/proxy/healthy_endpoints.go new file mode 100644 index 0000000000..b79a132769 --- /dev/null +++ b/proxy/healthy_endpoints.go @@ -0,0 +1,34 @@ +package proxy + +import ( + "math/rand" + + "github.com/zalando/skipper/routing" +) + +type healthyEndpoints struct { + rnd *rand.Rand + endpointRegistry *routing.EndpointRegistry +} + +func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint { + if h == nil { + return endpoints + } + + p := h.rnd.Float64() + + filtered := make([]routing.LBEndpoint, 0, len(endpoints)) + for _, e := range endpoints { + if p < e.Metrics.HealthCheckDropProbability() { + /* drop */ + } else { + filtered = append(filtered, e) + } + } + + if len(filtered) == 0 { + return endpoints + } + return filtered +} diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go index 3e5d0b7be9..b07e7bbc8b 100644 --- a/proxy/healthy_endpoints_test.go +++ b/proxy/healthy_endpoints_test.go @@ -14,13 +14,53 @@ import ( ) const ( - nRequests = 10_000 + nRequests = 15_000 rtFailureProbability = 0.8 - period = 1 * time.Second + period = 100 * time.Millisecond ) func defaultEndpointRegistry() *routing.EndpointRegistry { - return routing.NewEndpointRegistry(routing.RegistryOptions{}) + return routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: true, + StatsResetPeriod: period, + MinRequests: 10, + MaxHealthCheckDropProbability: 1.0, + }) +} + +func TestPHCWithoutRequests(t *testing.T) { + services := []*httptest.Server{} + for i := 0; i < 3; i++ { + service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + services = append(services, service) + defer service.Close() + } + endpointRegistry := defaultEndpointRegistry() + + doc := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL, services[2].URL) + tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, + }) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + rsp, err := ps.Client().Get(ps.URL) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, http.StatusOK, rsp.StatusCode) + rsp.Body.Close() + + time.Sleep(10 * period) + /* this test is needed to check PHC will not crash without requests sent during period at all */ } func TestPHCForSingleHealthyEndpoint(t *testing.T) { @@ -32,7 +72,8 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) { doc := fmt.Sprintf(`* -> "%s"`, service.URL) tp, err := newTestProxyWithParams(doc, Params{ - EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, }) if err != nil { t.Fatal(err) @@ -55,27 +96,6 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) { rsp.Body.Close() } assert.Equal(t, 0, failedReqs) - - // Let endpointregistry update all stats - time.Sleep(period + time.Millisecond) - dummy := fmt.Sprintf(`Header("Foo", "Bar") -> "%s"`, service.URL) - tp.dc.UpdateDoc(dummy, nil) - tp.dc.UpdateDoc(doc, nil) - time.Sleep(10 * time.Millisecond) - - failedReqs = 0 - for i := 0; i < nRequests; i++ { - rsp, err := ps.Client().Get(ps.URL) - if err != nil { - t.Fatal(err) - } - - if rsp.StatusCode != http.StatusOK { - failedReqs++ - } - rsp.Body.Close() - } - assert.Equal(t, 0, failedReqs) } func TestPHCForMultipleHealthyEndpoints(t *testing.T) { @@ -91,7 +111,8 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) { doc := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL, services[2].URL) tp, err := newTestProxyWithParams(doc, Params{ - EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, }) if err != nil { t.Fatal(err) @@ -114,27 +135,6 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) { rsp.Body.Close() } assert.Equal(t, 0, failedReqs) - - // Let endpointregistry update all stats - time.Sleep(period + time.Millisecond) - dummy := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL) - tp.dc.UpdateDoc(dummy, nil) - tp.dc.UpdateDoc(doc, nil) - time.Sleep(10 * time.Millisecond) - - failedReqs = 0 - for i := 0; i < nRequests; i++ { - rsp, err := ps.Client().Get(ps.URL) - if err != nil { - t.Fatal(err) - } - - if rsp.StatusCode != http.StatusOK { - failedReqs++ - } - rsp.Body.Close() - } - assert.Equal(t, 0, failedReqs) } type roundTripperUnhealthyHost struct { @@ -144,7 +144,7 @@ type roundTripperUnhealthyHost struct { rnd *rand.Rand } -type RoundTripperUnhealthyHostParams struct { +type RoundTripperUnhealthyHostOptions struct { Host string Probability float64 } @@ -158,7 +158,7 @@ func (rt *roundTripperUnhealthyHost) RoundTrip(r *http.Request) (*http.Response, return rt.inner.RoundTrip(r) } -func newRoundTripperUnhealthyHost(o *RoundTripperUnhealthyHostParams) func(r http.RoundTripper) http.RoundTripper { +func newRoundTripperUnhealthyHost(o *RoundTripperUnhealthyHostOptions) func(r http.RoundTripper) http.RoundTripper { return func(r http.RoundTripper) http.RoundTripper { return &roundTripperUnhealthyHost{inner: r, rnd: rand.New(loadbalancer.NewLockedSource()), host: o.Host, probability: o.Probability} } @@ -177,8 +177,9 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) { doc := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL, services[2].URL) tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, EndpointRegistry: endpointRegistry, - CustomHttpRoundTripperWrap: newRoundTripperUnhealthyHost(&RoundTripperUnhealthyHostParams{Host: services[0].URL[7:], Probability: rtFailureProbability}), + CustomHttpRoundTripperWrap: newRoundTripperUnhealthyHost(&RoundTripperUnhealthyHostOptions{Host: services[0].URL[7:], Probability: rtFailureProbability}), }) if err != nil { t.Fatal(err) @@ -200,28 +201,5 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) { } rsp.Body.Close() } - assert.InDelta(t, 0.33*rtFailureProbability*nRequests, failedReqs, 0.05*nRequests) - - // Let endpointregistry update all stats - time.Sleep(period + time.Millisecond) - dummy := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL) - tp.dc.UpdateDoc(dummy, nil) - tp.dc.UpdateDoc(doc, nil) - time.Sleep(10 * time.Millisecond) - - failedReqs = 0 - for i := 0; i < nRequests; i++ { - rsp, err := ps.Client().Get(ps.URL) - if err != nil { - t.Fatal(err) - } - - if rsp.StatusCode != http.StatusOK { - failedReqs++ - } - rsp.Body.Close() - } - assert.InDelta(t, 0.33*rtFailureProbability*nRequests, failedReqs, 0.05*nRequests) - // After PHC is implemented, I expect failed requests to decrease like this: - // assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*nRequests, failedReqs, 0.05*nRequests) + assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests)) } diff --git a/proxy/proxy.go b/proxy/proxy.go index 5503ca511e..d2156d8f55 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -145,6 +145,69 @@ type OpenTracingParams struct { ExcludeTags []string } +type PassiveHealthCheck struct { + // The period of time after which the endpointregistry begins to calculate endpoints statistics + // from scratch + Period time.Duration + + // The minimum number of total requests that should be sent to an endpoint in a single period to + // potentially opt out the endpoint from the list of healthy endpoints + MinRequests int64 + + // The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request + MaxDropProbability float64 +} + +func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) { + if len(o) == 0 { + return false, &PassiveHealthCheck{}, nil + } + + result := &PassiveHealthCheck{} + keysInitialized := make(map[string]struct{}) + + for key, value := range o { + switch key { + case "period": + period, err := time.ParseDuration(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + } + if period < 0 { + return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + } + result.Period = period + case "min-requests": + minRequests, err := strconv.Atoi(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + } + if minRequests < 0 { + return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + } + result.MinRequests = int64(minRequests) + case "max-drop-probability": + maxDropProbability, err := strconv.ParseFloat(value, 64) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + } + if maxDropProbability < 0 || maxDropProbability > 1 { + return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + } + result.MaxDropProbability = maxDropProbability + default: + return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + } + + keysInitialized[key] = struct{}{} + } + + if len(keysInitialized) != 3 { + return false, nil, fmt.Errorf("passive health check: missing required parameters") + } + return true, result, nil +} + // Proxy initialization options. type Params struct { // The proxy expects a routing instance that is used to match @@ -247,6 +310,12 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry + + // EnablePassiveHealthCheck enables the healthy endpoints checker + EnablePassiveHealthCheck bool + + // PassiveHealthCheck defines the parameters for the healthy endpoints checker. + PassiveHealthCheck *PassiveHealthCheck } type ( @@ -324,6 +393,7 @@ type Proxy struct { routing *routing.Routing registry *routing.EndpointRegistry fadein *fadeIn + heathlyEndpoints *healthyEndpoints roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -466,6 +536,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route endpoints := rt.LBEndpoints endpoints = p.fadein.filterFadeIn(endpoints, rt) + endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt) lbctx := &routing.LBContext{ Request: ctx.request, @@ -718,10 +789,21 @@ func WithParams(p Params) *Proxy { hostname := os.Getenv("HOSTNAME") + var healthyEndpointsChooser *healthyEndpoints + if p.EnablePassiveHealthCheck { + healthyEndpointsChooser = &healthyEndpoints{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + } + } return &Proxy{ - routing: p.Routing, - registry: p.EndpointRegistry, - fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry}, + routing: p.Routing, + registry: p.EndpointRegistry, + fadein: &fadeIn{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + }, + heathlyEndpoints: healthyEndpointsChooser, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, @@ -888,7 +970,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co req = injectClientTrace(req, ctx.proxySpan) response, err := roundTripper.RoundTrip(req) - + if endpointMetrics != nil { + endpointMetrics.IncRequests(routing.IncRequestsOptions{FailedRoundTrip: err != nil}) + } ctx.proxySpan.LogKV("http_roundtrip", EndEvent) if err != nil { if errors.Is(err, skpio.ErrBlocked) { @@ -1501,6 +1585,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // It's primary purpose is to support testing. func (p *Proxy) Close() error { close(p.quit) + p.registry.Close() return nil } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 6b8018fa6c..0ba1fce3eb 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -208,12 +208,14 @@ func newTestProxyWithFiltersAndParams(fr filters.Registry, doc string, params Pa } tl := loggingtest.New() - endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + if params.EndpointRegistry == nil { + params.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{}) + } opts := routing.Options{ FilterRegistry: fr, PollTimeout: sourcePollTimeout, DataClients: []routing.DataClient{dc}, - PostProcessors: []routing.PostProcessor{loadbalancer.NewAlgorithmProvider(), endpointRegistry}, + PostProcessors: []routing.PostProcessor{loadbalancer.NewAlgorithmProvider(), params.EndpointRegistry}, Log: tl, Predicates: []routing.PredicateSpec{teePredicate.New()}, } @@ -2278,3 +2280,133 @@ func BenchmarkAccessLogEnablePrint(b *testing.B) { benchmarkAccessLog(b, "enableAccessLog(1,200,3)", 200) } func BenchmarkAccessLogEnable(b *testing.B) { benchmarkAccessLog(b, "enableAccessLog(1,3)", 200) } + +func TestInitPassiveHealthChecker(t *testing.T) { + for i, ti := range []struct { + inputArg map[string]string + expectedEnabled bool + expectedParams *PassiveHealthCheck + expectedError error + }{ + { + inputArg: map[string]string{}, + expectedEnabled: false, + expectedParams: nil, + expectedError: nil, + }, + { + inputArg: map[string]string{ + "period": "somethingInvalid", + "min-requests": "10", + "max-drop-probability": "0.9", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "0.9", + }, + expectedEnabled: true, + expectedParams: &PassiveHealthCheck{ + Period: 1 * time.Minute, + MinRequests: 10, + MaxDropProbability: 0.9, + }, + expectedError: nil, + }, + { + inputArg: map[string]string{ + "period": "-1m", + "min-requests": "10", + "max-drop-probability": "0.9", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: -1m"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "somethingInvalid", + "max-drop-probability": "0.9", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "-10", + "max-drop-probability": "0.9", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "somethingInvalid", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "-0.1", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "3.1415", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + "max-drop-probability": "0.9", + "non-existing": "non-existing", + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"), + }, + { + inputArg: map[string]string{ + "period": "1m", + "min-requests": "10", + /* forgot max-drop-probability */ + }, + expectedEnabled: false, + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: missing required parameters"), + }, + } { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + enabled, params, err := InitPassiveHealthChecker(ti.inputArg) + assert.Equal(t, ti.expectedEnabled, enabled) + assert.Equal(t, ti.expectedError, err) + if enabled { + assert.Equal(t, ti.expectedParams, params) + } + }) + } +} diff --git a/proxy/proxytest/proxytest.go b/proxy/proxytest/proxytest.go index 091e970523..4d11f29e15 100644 --- a/proxy/proxytest/proxytest.go +++ b/proxy/proxytest/proxytest.go @@ -88,6 +88,7 @@ func (c Config) Create() *TestProxy { rt := routing.New(c.RoutingOptions) c.ProxyParams.Routing = rt + c.ProxyParams.EndpointRegistry = endpointRegistry pr := proxy.WithParams(c.ProxyParams) diff --git a/redis_test.go b/redis_test.go index 9eac1b1924..e4429763a2 100644 --- a/redis_test.go +++ b/redis_test.go @@ -109,6 +109,7 @@ spec: defer dc.Close() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() // create LB in front of apiservers to be able to switch the data served by apiserver ro := routing.Options{ @@ -283,6 +284,7 @@ spec: defer dc.Close() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() // create LB in front of apiservers to be able to switch the data served by apiserver ro := routing.Options{ diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f7b94315b1..e43cb7f867 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -22,12 +22,24 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + IncRequests(o IncRequestsOptions) + HealthCheckDropProbability() float64 +} + +type IncRequestsOptions struct { + FailedRoundTrip bool } type entry struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + + totalRequests [2]atomic.Int64 + totalFailedRoundTrips [2]atomic.Int64 + curSlot atomic.Int64 + healthCheckDropProbability atomic.Value // float64 } var _ Metrics = &entry{} @@ -60,23 +72,46 @@ func (e *entry) SetLastSeen(ts time.Time) { e.lastSeen.Store(ts) } +func (e *entry) IncRequests(o IncRequestsOptions) { + curSlot := e.curSlot.Load() + e.totalRequests[curSlot].Add(1) + if o.FailedRoundTrip { + e.totalFailedRoundTrips[curSlot].Add(1) + } +} + +func (e *entry) HealthCheckDropProbability() float64 { + return e.healthCheckDropProbability.Load().(float64) +} + func newEntry() *entry { result := &entry{} + result.healthCheckDropProbability.Store(0.0) result.SetDetected(time.Time{}) result.SetLastSeen(time.Time{}) return result } type EndpointRegistry struct { - lastSeenTimeout time.Duration - now func() time.Time - data sync.Map // map[string]*entry + lastSeenTimeout time.Duration + statsResetPeriod time.Duration + minRequests int64 + maxHealthCheckDropProbability float64 + + quit chan struct{} + + now func() time.Time + data sync.Map // map[string]*entry } var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration + LastSeenTimeout time.Duration + PassiveHealthCheckEnabled bool + StatsResetPeriod time.Duration + MinRequests int64 + MaxHealthCheckDropProbability float64 } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -115,16 +150,64 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { return routes } +func (r *EndpointRegistry) updateStats() { + ticker := time.NewTicker(r.statsResetPeriod) + + for { + r.data.Range(func(key, value any) bool { + e := value.(*entry) + + curSlot := e.curSlot.Load() + nextSlot := (curSlot + 1) % 2 + e.totalFailedRoundTrips[nextSlot].Store(0) + e.totalRequests[nextSlot].Store(0) + + failed := e.totalFailedRoundTrips[curSlot].Load() + requests := e.totalRequests[curSlot].Load() + if requests > r.minRequests { + failedRoundTripsRatio := float64(failed) / float64(requests) + e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.maxHealthCheckDropProbability)) + } else { + e.healthCheckDropProbability.Store(0.0) + } + e.curSlot.Store(nextSlot) + + return true + }) + + select { + case <-r.quit: + return + case <-ticker.C: + } + } +} + +func (r *EndpointRegistry) Close() { + close(r.quit) +} + func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { if o.LastSeenTimeout == 0 { o.LastSeenTimeout = defaultLastSeenTimeout } - return &EndpointRegistry{ - data: sync.Map{}, - lastSeenTimeout: o.LastSeenTimeout, - now: time.Now, + registry := &EndpointRegistry{ + lastSeenTimeout: o.LastSeenTimeout, + statsResetPeriod: o.StatsResetPeriod, + minRequests: o.MinRequests, + maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability, + + quit: make(chan struct{}), + + now: time.Now, + data: sync.Map{}, + } + if o.PassiveHealthCheckEnabled { + go registry.updateStats() } + + return registry } func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics { diff --git a/routing/endpointregistry_test.go b/routing/endpointregistry_test.go index 59e2e59c3d..d6627a1e39 100644 --- a/routing/endpointregistry_test.go +++ b/routing/endpointregistry_test.go @@ -14,6 +14,8 @@ import ( func TestEmptyRegistry(t *testing.T) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() + m := r.GetMetrics("some key") assert.Equal(t, time.Time{}, m.DetectedTime()) @@ -24,6 +26,7 @@ func TestEmptyRegistry(t *testing.T) { func TestSetAndGet(t *testing.T) { now := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() mBefore := r.GetMetrics("some key") assert.Equal(t, time.Time{}, mBefore.DetectedTime()) @@ -47,6 +50,7 @@ func TestSetAndGet(t *testing.T) { func TestSetAndGetAnotherKey(t *testing.T) { now := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() mToChange := r.GetMetrics("some key") mToChange.IncInflightRequest() @@ -66,6 +70,7 @@ func TestSetAndGetAnotherKey(t *testing.T) { func TestDoRemovesOldEntries(t *testing.T) { beginTestTs := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() routing.SetNow(r, func() time.Time { return beginTestTs @@ -117,6 +122,7 @@ func TestDoRemovesOldEntries(t *testing.T) { func TestEndpointRegistryPostProcessor(t *testing.T) { beginTestTs := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() routing.SetNow(r, func() time.Time { return beginTestTs @@ -163,6 +169,8 @@ func TestEndpointRegistryPostProcessor(t *testing.T) { func TestMetricsMethodsDoNotAllocate(t *testing.T) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() + metrics := r.GetMetrics("some key") now := time.Now() metrics.SetDetected(now.Add(-time.Hour)) @@ -186,6 +194,7 @@ func TestMetricsMethodsDoNotAllocate(t *testing.T) { func TestRaceReadWrite(t *testing.T) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() duration := time.Second wg := sync.WaitGroup{} @@ -219,6 +228,7 @@ func TestRaceReadWrite(t *testing.T) { func TestRaceTwoWriters(t *testing.T) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer r.Close() duration := time.Second wg := sync.WaitGroup{} diff --git a/skipper.go b/skipper.go index f83b063654..445a30f9ef 100644 --- a/skipper.go +++ b/skipper.go @@ -917,6 +917,8 @@ type Options struct { OpenPolicyAgentStartupTimeout time.Duration OpenPolicyAgentMaxRequestBodySize int64 OpenPolicyAgentMaxMemoryBodyParsing int64 + + PassiveHealthCheck map[string]string } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1915,8 +1917,18 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { }) defer schedulerRegistry.Close() + passiveHealthCheckEnabled, passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) + if err != nil { + return err + } + // create a routing engine - endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: passiveHealthCheckEnabled, + StatsResetPeriod: passiveHealthCheck.Period, + MinRequests: passiveHealthCheck.MinRequests, + MaxHealthCheckDropProbability: passiveHealthCheck.MaxDropProbability, + }) ro := routing.Options{ FilterRegistry: o.filterRegistry(), MatchingOptions: mo, @@ -1982,6 +1994,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { proxyParams := proxy.Params{ Routing: routing, EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: passiveHealthCheckEnabled, + PassiveHealthCheck: passiveHealthCheck, Flags: proxyFlags, PriorityRoutes: o.PriorityRoutes, IdleConnectionsPerHost: o.IdleConnectionsPerHost, diff --git a/skipper_test.go b/skipper_test.go index f03116e898..3a6c275997 100644 --- a/skipper_test.go +++ b/skipper_test.go @@ -528,6 +528,7 @@ func TestDataClients(t *testing.T) { defer reg.Close() endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + defer endpointRegistry.Close() // create LB in front of apiservers to be able to switch the data served by apiserver ro := routing.Options{