Skip to content

Commit

Permalink
Add UT for route agent health checker
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Suryanarayanan <asuryana@redhat.com>
  • Loading branch information
aswinsuryan committed Sep 5, 2024
1 parent be1d40a commit 3cef4f7
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
SPDX-License-Identifier: Apache-2.0
Copyright Contributors to the Submariner project.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/log/kzerolog"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"k8s.io/client-go/kubernetes/scheme"
)

func init() {
kzerolog.AddFlags(nil)
}

var _ = BeforeSuite(func() {
kzerolog.InitK8sLogging()
Expect(submarinerv1.AddToScheme(scheme.Scheme)).To(Succeed())
})

func TestHealthChecker(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "RouteAgent Health Checker")
}
18 changes: 10 additions & 8 deletions pkg/routeagent_driver/handlers/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var RouteAgentUpdateInterval = 60 * time.Second

type Config struct {
PingInterval uint
MaxPacketLossCount uint
HealthCheckerEnabled bool
NewPinger func(pinger.Config) pinger.Interface
PingInterval uint
MaxPacketLossCount uint
HealthCheckerEnabled bool
RouteAgentUpdateInterval time.Duration
NewPinger func(pinger.Config) pinger.Interface
}

type controller struct {
Expand Down Expand Up @@ -84,7 +83,10 @@ func (h *controller) Stop() error {

h.pingers = map[string]pinger.Interface{}

close(h.stopCh)
if h.stopCh != nil {
close(h.stopCh)
h.stopCh = nil
}

err := h.client.Delete(context.TODO(),
h.localNodeName, metav1.DeleteOptions{})
Expand Down Expand Up @@ -179,7 +181,7 @@ func (h *controller) Init() error {
defer h.Unlock()

h.syncRouteAgentStatus()
}, RouteAgentUpdateInterval, h.stopCh)
}, h.config.RouteAgentUpdateInterval, h.stopCh)
}()

return nil
Expand Down
277 changes: 277 additions & 0 deletions pkg/routeagent_driver/handlers/healthchecker/healthchecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
SPDX-License-Identifier: Apache-2.0
Copyright Contributors to the Submariner project.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker_test

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
fakeClient "github.com/submariner-io/submariner/pkg/client/clientset/versioned/fake"
submarinerv1client "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/event"
eventtesting "github.com/submariner-io/submariner/pkg/event/testing"
"github.com/submariner-io/submariner/pkg/pinger"
"github.com/submariner-io/submariner/pkg/pinger/fake"
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/healthchecker"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
kubeScheme "k8s.io/client-go/kubernetes/scheme"
)

const (
namespace = "submariner"
remoteClusterID = "west"
healthCheckIP1 = "1.1.1.1"
healthCheckIP2 = "2.2.2.2"
)

var _ = Describe("RouteAgent Health Checker", func() {
t := newTestDriver()

When("syncRouteAgentStatus is triggered", func() {
It("should create or update the RouteAgent object and check LatencyInfo", func() {
endpoint1 := t.CreateEndpoint(t.newSubmEndpoint())
err = t.handler.RemoteEndpointCreated(endpoint1)
Expect(err).NotTo(HaveOccurred())

latencyInfo := t.newLatencyInfo()
t.setLatencyInfo(healthCheckIP1, latencyInfo)

var routeAgent *submarinerv1.RouteAgent
Eventually(func() *submarinerv1.LatencyRTTSpec {
var err error
routeAgent, err = t.client.Get(context.TODO(), t.localNodeName, metav1.GetOptions{})
if err == nil && len(routeAgent.Status.RemoteEndpoints) != 0 && routeAgent.Status.RemoteEndpoints[0].LatencyRTT != nil {
return routeAgent.Status.RemoteEndpoints[0].LatencyRTT
}
return nil
}, time.Second*50, time.Millisecond*500).Should(Equal(latencyInfo.Spec))

remoteEndpoint := routeAgent.Status.RemoteEndpoints[0]
Expect(remoteEndpoint.Spec.CableName).To(Equal(endpoint1.Spec.CableName))
Expect(remoteEndpoint.Spec.HealthCheckIP).To(Equal(endpoint1.Spec.HealthCheckIP))
})
When("a RemoteEndpoint is updated", func() {
var endpoint1 *submarinerv1.Endpoint
var routeAgent *submarinerv1.RouteAgent
When("the HealthCheckIP was changed", func() {
BeforeEach(func() {
endpoint1 = t.CreateEndpoint(t.newSubmEndpoint())
err := t.handler.RemoteEndpointCreated(endpoint1)
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStart()
})

It("should update the pinger and not create a new one if the HealthCheckIP remains the same", func() {
err := t.handler.RemoteEndpointUpdated(endpoint1)
Expect(err).NotTo(HaveOccurred())

pingerObject, found := t.pingerMap[endpoint1.Spec.HealthCheckIP]
Expect(found).To(BeTrue())
Expect(pingerObject.GetIP()).To(Equal(healthCheckIP1))

t.pingerMap[healthCheckIP1].AwaitNoStop()

latencyInfo := t.newLatencyInfo()
t.setLatencyInfo(healthCheckIP1, latencyInfo)

Eventually(func() *submarinerv1.LatencyRTTSpec {
var err error
routeAgent, err = t.client.Get(context.TODO(), t.localNodeName, metav1.GetOptions{})
if err == nil && len(routeAgent.Status.RemoteEndpoints) != 0 && routeAgent.Status.RemoteEndpoints[0].LatencyRTT != nil {
return routeAgent.Status.RemoteEndpoints[0].LatencyRTT
}
return nil
}, time.Second*50, time.Millisecond*500).Should(Equal(latencyInfo.Spec))
})
})
When("the HealthCheckIP was changed", func() {
BeforeEach(func() {
endpoint1 = t.CreateEndpoint(t.newSubmEndpoint())
err := t.handler.RemoteEndpointCreated(endpoint1)
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStart()
t.pingerMap[healthCheckIP2] = fake.NewPinger(healthCheckIP2)
})

It("should replace the pinger if the HealthCheckIP changes", func() {
endpoint1.Spec.HealthCheckIP = healthCheckIP2
err := t.handler.RemoteEndpointUpdated(endpoint1)
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStop()
t.pingerMap[healthCheckIP2].AwaitStart()
})
})
})
When("a RemoteEndpoint is deleted", func() {
var endpoint1 *submarinerv1.Endpoint

BeforeEach(func() {
endpoint1 = t.CreateEndpoint(t.newSubmEndpoint())
err := t.handler.RemoteEndpointCreated(endpoint1)
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStart()
})

It("should stop and remove the pinger for the deleted HealthCheckIP", func() {
err := t.handler.RemoteEndpointRemoved(endpoint1)
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStop()
})
})
When("transitioning to a non-gateway state", func() {
BeforeEach(func() {
endpoint1 := t.CreateEndpoint(t.newSubmEndpoint())
err := t.handler.RemoteEndpointCreated(endpoint1)
Expect(err).NotTo(HaveOccurred())
t.pingerMap[healthCheckIP1].AwaitStart()
})

It("should start pingers for all remote endpoints", func() {
err := t.handler.TransitionToGateway()
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStop()

err = t.handler.TransitionToNonGateway()
Expect(err).NotTo(HaveOccurred())

t.pingerMap[healthCheckIP1].AwaitStart()
})
})
})
})

type testDriver struct {
*eventtesting.ControllerSupport
pingerMap map[string]*fake.Pinger
handler event.Handler
endpoints dynamic.ResourceInterface
client submarinerv1client.RouteAgentInterface
stopCh chan struct{}
localNodeName string
}

func newTestDriver() *testDriver {
t := &testDriver{
ControllerSupport: eventtesting.NewControllerSupport(),
}

BeforeEach(func() {
t.stopCh = make(chan struct{})
scheme := runtime.NewScheme()
Expect(submarinerv1.AddToScheme(scheme)).To(Succeed())
Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed())

clientset := fakeClient.NewSimpleClientset()

dynamicClient := dynamicfake.NewSimpleDynamicClient(kubeScheme.Scheme)

t.endpoints = dynamicClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("endpoints")).Namespace(namespace)
t.client = clientset.SubmarinerV1().RouteAgents(namespace)
t.pingerMap = map[string]*fake.Pinger{
healthCheckIP1: fake.NewPinger(healthCheckIP1),
}

t.localNodeName = "nodeName"

config := &healthchecker.Config{
PingInterval: 1, // Set interval to 1 second for faster testing
MaxPacketLossCount: 1,
HealthCheckerEnabled: true,
RouteAgentUpdateInterval: 2 * time.Second,
}

config.NewPinger = func(pingerCfg pinger.Config) pinger.Interface {
defer GinkgoRecover()
Expect(pingerCfg.Interval).To(Equal(time.Second *
time.Duration(config.PingInterval))) //nolint:gosec // We can safely ignore integer conversion error
Expect(pingerCfg.MaxPacketLossCount).To(Equal(config.MaxPacketLossCount))

p, ok := t.pingerMap[pingerCfg.IP]
Expect(ok).To(BeTrue())

return p
}

t.handler = healthchecker.New(config, t.client, "v1", t.localNodeName)
Expect(t.handler.Init()).To(Succeed())

t.Start(t.handler)
})

AfterEach(func() {
close(t.stopCh)
})

return t
}

func (t *testDriver) newSubmEndpoint() *submarinerv1.Endpoint {
endpointSpec := &submarinerv1.EndpointSpec{
ClusterID: remoteClusterID,
CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", remoteClusterID),
HealthCheckIP: healthCheckIP1,
}

endpointName, err := endpointSpec.GenerateName()
Expect(err).To(Succeed())

endpoint := &submarinerv1.Endpoint{
ObjectMeta: metav1.ObjectMeta{
Name: endpointName,
},
Spec: *endpointSpec,
}

return endpoint
}

func (t *testDriver) newLatencyInfo() *pinger.LatencyInfo {
return &pinger.LatencyInfo{
ConnectionStatus: pinger.Connected,
Spec: &submarinerv1.LatencyRTTSpec{
Last: "82ms",
Min: "80ms",
Average: "85ms",
Max: "89ms",
StdDev: "5ms",
},
}
}

func (t *testDriver) setLatencyInfo(ip string, latencyInfo *pinger.LatencyInfo) {
pingerObject := t.pingerMap[ip]
pingerObject.SetLatencyInfo(latencyInfo)
}

func (t *testDriver) Start(handler event.Handler) {
t.ControllerSupport.Start(handler)
}
7 changes: 4 additions & 3 deletions pkg/routeagent_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ func main() {
logger.FatalOnError(err, "Error getting information on the local node")

healthcheckerConfig := &healthchecker.Config{
PingInterval: submSpec.HealthCheckInterval * 60,
MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount,
HealthCheckerEnabled: submSpec.HealthCheckEnabled,
PingInterval: submSpec.HealthCheckInterval * 60,
MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount,
HealthCheckerEnabled: submSpec.HealthCheckEnabled,
RouteAgentUpdateInterval: 60 * time.Second,
}

registry, err := event.NewRegistry("routeagent_driver", np,
Expand Down

0 comments on commit 3cef4f7

Please sign in to comment.