Skip to content

Commit

Permalink
feat(route_sync): add health checking
Browse files Browse the repository at this point in the history
  • Loading branch information
aauren authored and mrueg committed Nov 21, 2024
1 parent 9ca1c61 commit da6ef9b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
9 changes: 4 additions & 5 deletions pkg/controllers/routing/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ const (
type RouteSyncer interface {
AddInjectedRoute(dst *net.IPNet, route *netlink.Route)
DelInjectedRoute(dst *net.IPNet)
Run(stopCh <-chan struct{}, wg *sync.WaitGroup)
SyncLocalRouteTable()
Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup)
SyncLocalRouteTable() error
}

// PolicyBasedRouting is an interface that defines the methods needed to enable/disable policy based routing
Expand Down Expand Up @@ -310,7 +310,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll
klog.Infof("Starting network route controller")

// Start route syncer
nrc.routeSyncer.Run(stopCh, wg)
nrc.routeSyncer.Run(healthChan, stopCh, wg)

// Wait till we are ready to launch BGP server
for {
Expand Down Expand Up @@ -709,8 +709,7 @@ func (nrc *NetworkRoutingController) injectRoute(path *gobgpapi.Path) error {
klog.V(2).Infof("Inject route: '%s via %s' from peer to routing table", dst, nextHop)
nrc.routeSyncer.AddInjectedRoute(dst, route)
// Immediately sync the local route table regardless of timer
nrc.routeSyncer.SyncLocalRouteTable()
return nil
return nrc.routeSyncer.SyncLocalRouteTable()
}

func (nrc *NetworkRoutingController) isPeerEstablished(peerIP string) (bool, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/healthcheck/health_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
NetworkServicesController
HairpinController
MetricsController
RouteSyncController
)

var (
Expand All @@ -34,6 +35,7 @@ var (
NetworkServicesController: "NetworkServicesController",
HairpinController: "HairpinController",
MetricsController: "MetricsController",
RouteSyncController: "RouteSyncController",
}
)

Expand Down Expand Up @@ -66,6 +68,8 @@ type HealthStats struct {
NetworkServicesControllerAliveTTL time.Duration
HairpinControllerAlive time.Time
HairpinControllerAliveTTL time.Duration
RouteSyncControllerAlive time.Time
RouteSyncControllerAliveTTL time.Duration
}

// SendHeartBeat sends a heartbeat on the passed channel
Expand Down Expand Up @@ -139,6 +143,12 @@ func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) {
}
hc.Status.NetworkRoutingControllerAlive = beat.LastHeartBeat

case RouteSyncController:
if hc.Status.RouteSyncControllerAliveTTL == 0 {
hc.Status.RouteSyncControllerAliveTTL = time.Since(hc.Status.RouteSyncControllerAlive)
}
hc.Status.RouteSyncControllerAlive = beat.LastHeartBeat

case NetworkPolicyController:
if hc.Status.NetworkPolicyControllerAliveTTL == 0 {
hc.Status.NetworkPolicyControllerAliveTTL = time.Since(hc.Status.NetworkPolicyControllerAlive)
Expand Down Expand Up @@ -177,6 +187,11 @@ func (hc *HealthController) CheckHealth() bool {
klog.Error("Network Routing Controller heartbeat missed")
health = false
}
if time.Since(hc.Status.RouteSyncControllerAlive) >
hc.Config.InjectedRoutesSyncPeriod+hc.Status.RouteSyncControllerAliveTTL+graceTime {
klog.Error("Routes Sync Controller heartbeat missed")
health = false
}
}

if hc.Config.RunServiceProxy {
Expand Down Expand Up @@ -261,6 +276,7 @@ func (hc *HealthController) SetAlive() {
hc.Status.NetworkRoutingControllerAlive = now
hc.Status.NetworkServicesControllerAlive = now
hc.Status.HairpinControllerAlive = now
hc.Status.RouteSyncControllerAlive = now
}

// NewHealthController creates a new health controller and returns a reference to it
Expand Down
31 changes: 27 additions & 4 deletions pkg/routes/route_sync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package routes

import (
"fmt"
"net"
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/vishvananda/netlink"
"k8s.io/klog/v2"
)

type RouteSyncErr struct {
route *netlink.Route
err error
}

func (rse RouteSyncErr) Error() string {
return fmt.Sprintf("route (%s) encountered the following error while being acted upon: %v", rse.route, rse.err)
}

// RouteSync is a struct that holds all of the information needed for syncing routes to the kernel's routing table
type RouteSync struct {
routeTableStateMap map[string]*netlink.Route
Expand Down Expand Up @@ -36,21 +47,26 @@ func (rs *RouteSync) DelInjectedRoute(dst *net.IPNet) {
}

// syncLocalRouteTable iterates over the local route state map and syncs all routes to the kernel's routing table
func (rs *RouteSync) SyncLocalRouteTable() {
func (rs *RouteSync) SyncLocalRouteTable() error {
rs.mutex.Lock()
defer rs.mutex.Unlock()
klog.V(2).Infof("Running local route table synchronization")
for _, route := range rs.routeTableStateMap {
klog.V(3).Infof("Syncing route: %s -> %s via %s", route.Src, route.Dst, route.Gw)
err := rs.routeReplacer(route)
if err != nil {
klog.Errorf("Route could not be replaced due to : " + err.Error())
return RouteSyncErr{
route: route,
err: err,
}
}
}
return nil
}

// run starts a goroutine that calls syncLocalRouteTable on interval injectedRoutesSyncPeriod
func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (rs *RouteSync) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
// Start route synchronization routine
wg.Add(1)
go func(stopCh <-chan struct{}, wg *sync.WaitGroup) {
Expand All @@ -60,7 +76,14 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
for {
select {
case <-t.C:
rs.SyncLocalRouteTable()
err := rs.SyncLocalRouteTable()
if err != nil {
klog.Errorf("route could not be replaced due to: %v", err)
}
// Some of our unit tests send a nil health channel
if nil != healthChan && err == nil {
healthcheck.SendHeartBeat(healthChan, healthcheck.RouteSyncController)
}
case <-stopCh:
klog.Infof("Shutting down local route synchronization")
return
Expand Down
6 changes: 4 additions & 2 deletions pkg/routes/route_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func Test_syncLocalRouteTable(t *testing.T) {
// we try to use it in addInjectedRoute() below
myNetlink.wg = &sync.WaitGroup{}
myNetlink.wg.Add(1)
go syncer.SyncLocalRouteTable()
go func() {
_ = syncer.SyncLocalRouteTable()
}()

// Now we know that the syncLocalRouteTable() is paused on our artificial wait we added above
myNetlink.wg.Wait()
Expand Down Expand Up @@ -151,7 +153,7 @@ func Test_routeSyncer_run(t *testing.T) {
// For a sanity check that the currentRoute on the mock object is nil to start with as we'll rely on this later
assert.Nil(t, myNetLink.currentRoute, "currentRoute should be nil when the syncer hasn't run")

syncer.Run(stopCh, &wg)
syncer.Run(nil, stopCh, &wg)

time.Sleep(110 * time.Millisecond)

Expand Down

0 comments on commit da6ef9b

Please sign in to comment.