diff --git a/pkg/networkplugin-syncer/handlers/ovn/connection.go b/pkg/networkplugin-syncer/handlers/ovn/connection.go index 20bff79164..9aceceb5b7 100644 --- a/pkg/networkplugin-syncer/handlers/ovn/connection.go +++ b/pkg/networkplugin-syncer/handlers/ovn/connection.go @@ -20,11 +20,11 @@ package ovn import ( "context" - "crypto/tls" - "crypto/x509" "os" "strings" + "crypto/tls" + "crypto/x509" "github.com/cenkalti/backoff/v4" libovsdbclient "github.com/ovn-org/libovsdb/client" "github.com/ovn-org/libovsdb/model" diff --git a/pkg/networkplugin-syncer/main.go b/pkg/networkplugin-syncer/main.go index f0498bdf5c..d75a30fe4d 100644 --- a/pkg/networkplugin-syncer/main.go +++ b/pkg/networkplugin-syncer/main.go @@ -19,102 +19,14 @@ limitations under the License. package main import ( - "flag" - "os" - - "github.com/kelseyhightower/envconfig" "github.com/submariner-io/admiral/pkg/log" - "github.com/submariner-io/admiral/pkg/log/kzerolog" - "github.com/submariner-io/submariner/pkg/cni" - "github.com/submariner-io/submariner/pkg/event" - "github.com/submariner-io/submariner/pkg/event/controller" - eventlogger "github.com/submariner-io/submariner/pkg/event/logger" - "github.com/submariner-io/submariner/pkg/networkplugin-syncer/handlers/ovn" - "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" - "github.com/submariner-io/submariner/pkg/versions" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" ) var ( - masterURL string - kubeconfig string - logger = log.Logger{Logger: logf.Log.WithName("main")} + logger = log.Logger{Logger: logf.Log.WithName("main")} ) func main() { - kzerolog.AddFlags(nil) - flag.Parse() - kzerolog.InitK8sLogging() - - versions.Log(&logger) - - logger.Info("Starting submariner-networkplugin-syncer") - // set up signals so we handle the first shutdown signal gracefully - stopCh := signals.SetupSignalHandler().Done() - - var env environment.Specification - - err := envconfig.Process("submariner", &env) - logger.FatalOnError(err, "Error processing env config") - - networkPlugin := os.Getenv("SUBMARINER_NETWORKPLUGIN") - - if networkPlugin == "" { - networkPlugin = cni.Generic - } - - registry := event.NewRegistry("networkplugin-syncer", networkPlugin) - err = registry.AddHandlers(eventlogger.NewHandler(), ovn.NewSyncHandler(getK8sClient(), &env)) - logger.FatalOnError(err, "Error registering the handlers") - - if env.Uninstall { - if err := registry.StopHandlers(true); err != nil { - logger.Warningf("Error stopping handlers: %v", err) - } - - return - } - - ctl, err := controller.New(&controller.Config{ - Registry: registry, - MasterURL: masterURL, - Kubeconfig: kubeconfig, - }) - logger.FatalOnError(err, "Error creating controller for event handling") - - err = ctl.Start(stopCh) - logger.FatalOnError(err, "Error starting controller") - - <-stopCh - ctl.Stop() - - logger.Info("All controllers stopped or exited. Stopping submariner-networkplugin-syncer") -} - -func getK8sClient() kubernetes.Interface { - var cfg *rest.Config - var err error - - if masterURL == "" && kubeconfig == "" { - cfg, err = rest.InClusterConfig() - logger.FatalOnError(err, "Error getting in-cluster-config, please set kubeconfig and master parameters") - } else { - cfg, err = clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) - logger.FatalOnError(err, "Error building kubeconfig") - } - - clientSet, err := kubernetes.NewForConfig(cfg) - logger.FatalOnError(err, "Error building clientset") - - return clientSet -} - -func init() { - flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&masterURL, "master", "", - "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + logger.Info("Network Plugin Syncer is deprecated will be removed in future") } diff --git a/pkg/routeagent_driver/handlers/ovn/connection.go b/pkg/routeagent_driver/handlers/ovn/connection.go new file mode 100644 index 0000000000..999b31b930 --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/connection.go @@ -0,0 +1,160 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "os" + "strings" + + "context" + "crypto/tls" + "crypto/x509" + "github.com/cenkalti/backoff/v4" + libovsdbclient "github.com/ovn-org/libovsdb/client" + "github.com/ovn-org/libovsdb/model" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/sbdb" + "github.com/pkg/errors" + "github.com/submariner-io/submariner/pkg/util/clusterfiles" + clientset "k8s.io/client-go/kubernetes" +) + +type ConnectionHandler struct { + k8sClientset clientset.Interface + nbdb libovsdbclient.Client +} + +func NewConnectionHandler(k8sClientset clientset.Interface) *ConnectionHandler { + return &ConnectionHandler{ + k8sClientset: k8sClientset, + } +} + +func (connectionHandler *ConnectionHandler) initClients() error { + var tlsConfig *tls.Config + + if strings.HasPrefix(getOVNNBDBAddress(), "ssl:") { + certFile, err := clusterfiles.Get(connectionHandler.k8sClientset, getOVNCertPath()) + if err != nil { + return errors.Wrapf(err, "error getting config for %q", getOVNCertPath()) + } + + pkFile, err := clusterfiles.Get(connectionHandler.k8sClientset, getOVNPrivKeyPath()) + if err != nil { + return errors.Wrapf(err, "error getting config for %q", getOVNPrivKeyPath()) + } + + caFile, err := clusterfiles.Get(connectionHandler.k8sClientset, getOVNCaBundlePath()) + if err != nil { + return errors.Wrapf(err, "error getting config for %q", getOVNCaBundlePath()) + } + + tlsConfig, err = getOVNTLSConfig(pkFile, certFile, caFile) + if err != nil { + return errors.Wrap(err, "error getting OVN TLS config") + } + } + + // Create nbdb client + nbdbModel, err := nbdb.FullDatabaseModel() + if err != nil { + return errors.Wrap(err, "error getting OVN NBDB database model") + } + + connectionHandler.nbdb, err = createLibovsdbClient(getOVNNBDBAddress(), tlsConfig, nbdbModel) + if err != nil { + return errors.Wrap(err, "error creating NBDB connection") + } + + return nil +} + +func getOVNTLSConfig(pkFile, certFile, caFile string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certFile, pkFile) + if err != nil { + return nil, errors.Wrap(err, "Failure loading ovn certificates") + } + + rootCAs := x509.NewCertPool() + + data, err := os.ReadFile(caFile) + if err != nil { + return nil, errors.Wrap(err, "failure loading OVNDB ca bundle") + } + + rootCAs.AppendCertsFromPEM(data) + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: rootCAs, + ServerName: "ovn", + MinVersion: tls.VersionTLS12, + }, nil +} + +func createLibovsdbClient(dbAddress string, tlsConfig *tls.Config, dbModel model.ClientDBModel) (libovsdbclient.Client, error) { + options := []libovsdbclient.Option{ + // Reading and parsing the DB after reconnect at scale can (unsurprisingly) + // take longer than a normal ovsdb operation. Give it a bit more time so + // we don't time out and enter a reconnect loop. + libovsdbclient.WithReconnect(OVSDBTimeout, &backoff.ZeroBackOff{}), + libovsdbclient.WithLogger(&logger.Logger), + } + + options = append(options, libovsdbclient.WithEndpoint(dbAddress)) + if tlsConfig != nil { + options = append(options, libovsdbclient.WithTLSConfig(tlsConfig)) + } + + client, err := libovsdbclient.NewOVSDBClient(dbModel, options...) + if err != nil { + return nil, errors.Wrap(err, "error creating ovsdbClient") + } + + ctx, cancel := context.WithTimeout(context.Background(), OVSDBTimeout) + defer cancel() + + err = client.Connect(ctx) + if err != nil { + return nil, errors.Wrap(err, "error connecting to ovsdb") + } + + if dbModel.Name() == "OVN_Northbound" { + _, err = client.MonitorAll(ctx) + if err != nil { + client.Close() + return nil, errors.Wrap(err, "error setting OVN NBDB client to monitor-all") + } + } else { + // Only Monitor Required SBDB tables to reduce memory overhead + _, err = client.Monitor(ctx, + client.NewMonitor( + libovsdbclient.WithTable(&sbdb.Chassis{}), + ), + ) + if err != nil { + client.Close() + return nil, errors.Wrap(err, "error monitoring chassis table in OVN SBDB") + } + } + + logger.Info("Client is %v", client) + + return client, nil +} diff --git a/pkg/routeagent_driver/handlers/ovn/env.go b/pkg/routeagent_driver/handlers/ovn/env.go new file mode 100644 index 0000000000..c66947a15b --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/env.go @@ -0,0 +1,69 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "os" + "time" +) + +// default OVSDB timeout used by ovn-k. +const ( + OVSDBTimeout = 20 * time.Second + ovnCert = "secret://openshift-ovn-kubernetes/ovn-cert/tls.crt" + ovnPrivKey = "secret://openshift-ovn-kubernetes/ovn-cert/tls.key" + ovnCABundle = "configmap://openshift-ovn-kubernetes/ovn-ca/ca-bundle.crt" + defaultOVNNBDB = "ssl:ovnkube-db.openshift-ovn-kubernetes.svc.cluster.local:9641" +) + +func getOVNNBDBAddress() string { + addr := os.Getenv("OVN_NBDB") + if addr == "" { + return defaultOVNNBDB + } + + return addr +} + +func getOVNPrivKeyPath() string { + key := os.Getenv("OVN_PK") + if key == "" { + return ovnPrivKey + } + + return key +} + +func getOVNCertPath() string { + cert := os.Getenv("OVN_CERT") + if cert == "" { + return ovnCert + } + + return cert +} + +func getOVNCaBundlePath() string { + ca := os.Getenv("OVN_CA") + if ca == "" { + return ovnCABundle + } + + return ca +} diff --git a/pkg/routeagent_driver/handlers/ovn/gateway_dataplane.go b/pkg/routeagent_driver/handlers/ovn/gateway_dataplane.go index a16d57dde7..c593d4b7e7 100644 --- a/pkg/routeagent_driver/handlers/ovn/gateway_dataplane.go +++ b/pkg/routeagent_driver/handlers/ovn/gateway_dataplane.go @@ -19,12 +19,10 @@ limitations under the License. package ovn import ( - "net" "os" "strconv" "github.com/pkg/errors" - npSyncerOvn "github.com/submariner-io/submariner/pkg/networkplugin-syncer/handlers/ovn" "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" iptcommon "github.com/submariner-io/submariner/pkg/routeagent_driver/iptables" "github.com/vishvananda/netlink" @@ -41,7 +39,12 @@ func (ovn *Handler) cleanupGatewayDataplane() error { return errors.Wrapf(err, "error removing routing rule") } - err = netlink.RouteDel(ovn.getSubmDefaultRoute()) + defaultRoute, err := ovn.getSubmDefaultRoute() + if err != nil { + return errors.Wrap(err, "error creating default route") + } + + err = netlink.RouteDel(defaultRoute) if err != nil && !os.IsNotExist(err) { return errors.Wrap(err, "error deleting submariner default route") } @@ -71,7 +74,12 @@ func (ovn *Handler) updateGatewayDataplane() error { return errors.Wrapf(err, "error removing routing rule") } - err = netlink.RouteAdd(ovn.getSubmDefaultRoute()) + defaultRoute, err := ovn.getSubmDefaultRoute() + if err != nil { + return errors.Wrap(err, "error creating default route") + } + + err = netlink.RouteAdd(defaultRoute) if err != nil && !os.IsExist(err) { return errors.Wrap(err, "error adding submariner default") } @@ -150,15 +158,29 @@ func (ovn *Handler) setupForwardingIptables() error { } func (ovn *Handler) addNoMasqueradeIPTables(subnet string) error { - return errors.Wrapf(ovn.ipt.AppendUnique("nat", constants.SmPostRoutingChain, + err := errors.Wrapf(ovn.ipt.AppendUnique("nat", constants.SmPostRoutingChain, []string{"-d", subnet, "-j", "ACCEPT"}...), "error updating %q rules for subnet %q", constants.SmPostRoutingChain, subnet) + if err != nil { + return err + } + + return errors.Wrapf(ovn.ipt.AppendUnique("nat", constants.SmPostRoutingChain, + []string{"-s", subnet, "-j", "ACCEPT"}...), "error updating %q rules for subnet %q", + constants.SmPostRoutingChain, subnet) } func (ovn *Handler) removeNoMasqueradeIPTables(subnet string) error { - return errors.Wrapf(ovn.ipt.Delete("nat", constants.SmPostRoutingChain, + err := errors.Wrapf(ovn.ipt.Delete("nat", constants.SmPostRoutingChain, []string{"-d", subnet, "-j", "ACCEPT"}...), "error updating %q rules for subnet %q", constants.SmPostRoutingChain, subnet) + if err != nil { + return err + } + + return errors.Wrapf(ovn.ipt.Delete("nat", constants.SmPostRoutingChain, + []string{"-s", subnet, "-j", "ACCEPT"}...), "error updating %q rules for subnet %q", + constants.SmPostRoutingChain, subnet) } func (ovn *Handler) cleanupForwardingIptables() error { @@ -170,11 +192,16 @@ func (ovn *Handler) cleanupForwardingIptables() error { "error clearing chain %q", forwardingSubmarinerFWDChain) } -func (ovn *Handler) getSubmDefaultRoute() *netlink.Route { +func (ovn *Handler) getSubmDefaultRoute() (*netlink.Route, error) { + nextHop, err := ovn.getNextHopOnK8sMgmtIntf() + if err != nil { + return nil, errors.Wrapf(err, "getNextHopOnK8sMgmtIntf returned error") + } + return &netlink.Route{ - Gw: net.ParseIP(npSyncerOvn.SubmarinerUpstreamIP), + Gw: *nextHop, Table: constants.RouteAgentInterClusterNetworkTableID, - } + }, nil } func (ovn *Handler) initIPtablesChains() error { diff --git a/pkg/routeagent_driver/handlers/ovn/gateway_route_controller.go b/pkg/routeagent_driver/handlers/ovn/gateway_route_controller.go new file mode 100644 index 0000000000..f21ebd2c12 --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/gateway_route_controller.go @@ -0,0 +1,126 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "sync" + + "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/federate" + "github.com/submariner-io/admiral/pkg/syncer" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +type GatewayRouteController struct { + resourceSyncer syncer.Interface + connectionHandler *ConnectionHandler + mutex sync.Mutex + remoteSubnets sets.Set[string] + stopCh chan struct{} + mgmtIP string +} + +func NewGatewayRoute(config *syncer.ResourceSyncerConfig, connectionHandler *ConnectionHandler, +) (*GatewayRouteController, error) { + var err error + + controller := &GatewayRouteController{ + connectionHandler: connectionHandler, + remoteSubnets: sets.New[string](), + } + + federator := federate.NewUpdateStatusFederator(config.SourceClient, config.RestMapper, corev1.NamespaceAll) + + controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{ + Name: "GatewayRoute syncer", + ResourceType: &submarinerv1.GatewayRoute{}, + SourceClient: config.SourceClient, + SourceNamespace: "submariner-operator", + RestMapper: config.RestMapper, + Federator: federator, + Scheme: config.Scheme, + Transform: controller.process, + ResourcesEquivalent: syncer.AreSpecsEquivalent, + }) + + if err != nil { + return nil, errors.Wrap(err, "error creating resource syncer") + } + + mgmtIP, err := getNextHopOnK8sMgmtIntf() + if err != nil { + return nil, err + } + + controller.mgmtIP = mgmtIP + + err = controller.resourceSyncer.Start(controller.stopCh) + if err != nil { + return nil, errors.Wrapf(err, "error starting the resource syncer") + } + + logger.Infof("Started GatewayRouteController") + + return controller, nil +} + +func (g *GatewayRouteController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { + g.mutex.Lock() + defer g.mutex.Unlock() + + subMGWRoute := from.(*submarinerv1.GatewayRoute) + if subMGWRoute.RoutePolicySpec.NextHops[0] == g.mgmtIP { + if op == syncer.Create || op == syncer.Update { + for _, subnet := range subMGWRoute.RoutePolicySpec.RemoteCIDRs { + logger.Infof("Inserting Subnetes") + g.remoteSubnets.Insert(subnet) + } + } else { + for _, subnet := range subMGWRoute.RoutePolicySpec.RemoteCIDRs { + logger.Infof("In Delete function") + g.remoteSubnets.Delete(subnet) + } + } + + err := g.connectionHandler.ReconcileSubOvnLogicalRouterPolicies(g.remoteSubnets, g.mgmtIP) + if err != nil { + logger.Errorf(err, "error reconciling router policies for remote subnet %q", g.remoteSubnets) + return nil, true + } + + err = g.connectionHandler.ReconcileOvnLogicalRouterStaticRoutes(g.remoteSubnets, g.mgmtIP) + if err != nil { + logger.Errorf(err, "error reconciling static routes for remote subnet %q", g.remoteSubnets) + return nil, true + } + } + + return nil, false +} + +func (g *GatewayRouteController) Start() error { + return errors.Wrapf(g.resourceSyncer.Start(g.stopCh), "error starting gatewayroute controller") +} + +func (g *GatewayRouteController) Stop() { + close(g.stopCh) +} diff --git a/pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go b/pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go index 8f5e70836a..a22f777dc1 100644 --- a/pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go +++ b/pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go @@ -20,6 +20,7 @@ package ovn import ( "context" + "github.com/submariner-io/submariner/pkg/cni" "sync" "github.com/pkg/errors" @@ -72,9 +73,7 @@ func (h *GatewayRouteHandler) GetName() string { } func (h *GatewayRouteHandler) GetNetworkPlugins() []string { - // TODO enable when we switch to new implementation - // return []string{cni.OVNKubernetes} - return []string{} + return []string{cni.OVNKubernetes} } func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { @@ -152,7 +151,7 @@ func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) * } } -//nolint // These functions are pass-through wrappers for the k8s APIs. +// nolint // These functions are pass-through wrappers for the k8s APIs. func (h *GatewayRouteHandler) gatewayResourceInterface(namespace string) resource.Interface { return &resource.InterfaceFuncs{ diff --git a/pkg/routeagent_driver/handlers/ovn/handler.go b/pkg/routeagent_driver/handlers/ovn/handler.go index 421bd04036..475fe01984 100644 --- a/pkg/routeagent_driver/handlers/ovn/handler.go +++ b/pkg/routeagent_driver/handlers/ovn/handler.go @@ -24,6 +24,10 @@ import ( "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/syncer" + "github.com/submariner-io/admiral/pkg/syncer/broker" + admUtil "github.com/submariner-io/admiral/pkg/util" + "github.com/submariner-io/admiral/pkg/watcher" submV1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/cable/wireguard" "github.com/submariner-io/submariner/pkg/cidr" @@ -33,6 +37,9 @@ import ( "github.com/submariner-io/submariner/pkg/iptables" "github.com/submariner-io/submariner/pkg/netlink" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -41,6 +48,8 @@ type Handler struct { mutex sync.Mutex config *environment.Specification smClient clientset.Interface + k8sClientset *kubernetes.Clientset + watcherConfig *watcher.Config cableRoutingInterface *net.Interface localEndpoint *submV1.Endpoint remoteEndpoints map[string]*submV1.Endpoint @@ -52,7 +61,9 @@ type Handler struct { var logger = log.Logger{Logger: logf.Log.WithName("OVN")} -func NewHandler(env *environment.Specification, smClientSet clientset.Interface) *Handler { +func NewHandler(env *environment.Specification, smClientSet clientset.Interface, k8sClientset *kubernetes.Clientset, + watcherConfig *watcher.Config, +) *Handler { // We'll panic if env is nil, this is intentional ipt, err := iptables.New() if err != nil { @@ -62,6 +73,8 @@ func NewHandler(env *environment.Specification, smClientSet clientset.Interface) return &Handler{ config: env, smClient: smClientSet, + k8sClientset: k8sClientset, + watcherConfig: watcherConfig, remoteEndpoints: map[string]*submV1.Endpoint{}, netlink: netlink.New(), ipt: ipt, @@ -85,6 +98,58 @@ func (ovn *Handler) Init() error { ovn.startRouteConfigSyncer(ovn.stopCh) + connectionHandler := NewConnectionHandler(ovn.k8sClientset) + + err = connectionHandler.initClients() + if err != nil { + return errors.Wrapf(err, "Error getting connection handler to connect to OvnDB") + } + + if ovn.watcherConfig.RestMapper == nil { + if ovn.watcherConfig.RestMapper, err = admUtil.BuildRestMapper(ovn.watcherConfig.RestConfig); err != nil { + return errors.Wrap(err, "error creating the RestMapper") + } + } + + if ovn.watcherConfig.Client == nil { + if ovn.watcherConfig.Client, err = dynamic.NewForConfig(ovn.watcherConfig.RestConfig); err != nil { + return errors.Wrap(err, "error creating dynamic client") + } + } + + if ovn.watcherConfig.Scheme == nil { + ovn.watcherConfig.Scheme = scheme.Scheme + } + + syncerConfig := &syncer.ResourceSyncerConfig{ + SourceClient: ovn.watcherConfig.Client, + SourceNamespace: "submariner-operator", + Direction: syncer.None, + RestMapper: ovn.watcherConfig.RestMapper, + Federator: broker.NewFederator(ovn.watcherConfig.Client, ovn.watcherConfig.RestMapper, "submariner-operator", ""), + Scheme: ovn.watcherConfig.Scheme, + } + + gatewayRoute, err := NewGatewayRoute(syncerConfig, connectionHandler) + if err != nil { + return err + } + + err = gatewayRoute.Start() + if err != nil { + return err + } + + nonGatewayRoute, err := NewNonGatewayRoute(syncerConfig, connectionHandler, ovn.k8sClientset) + if err != nil { + return err + } + + err = nonGatewayRoute.Start() + if err != nil { + return err + } + return ovn.ensureSubmarinerNodeBridge() } @@ -134,7 +199,8 @@ func (ovn *Handler) LocalEndpointRemoved(endpoint *submV1.Endpoint) error { } func (ovn *Handler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error { - if err := cidr.OverlappingSubnets(ovn.config.ServiceCidr, ovn.config.ClusterCidr, endpoint.Spec.Subnets); err != nil { + if err := cidr.OverlappingSubnets(ovn.config.ServiceCidr, ovn.config.ClusterCidr, + endpoint.Spec.Subnets); err != nil { // Skip processing the endpoint when CIDRs overlap and return nil to avoid re-queuing. logger.Errorf(err, "overlappingSubnets for new remote %#v returned error", endpoint) return nil diff --git a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go new file mode 100644 index 0000000000..c139c4cfb2 --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go @@ -0,0 +1,141 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "context" + "os" + "sync" + + "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/federate" + "github.com/submariner-io/admiral/pkg/syncer" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" +) + +type NonGatewayRouteController struct { + resourceSyncer syncer.Interface + connectionHandler *ConnectionHandler + mutex sync.Mutex + remoteSubnets sets.Set[string] + stopCh chan struct{} + transitSwitchIP string + k8sClientSet clientset.Interface +} + +func NewNonGatewayRoute(config *syncer.ResourceSyncerConfig, connectionHandler *ConnectionHandler, + k8sClientSet clientset.Interface, +) (*NonGatewayRouteController, error) { + // We'll panic if config is nil, this is intentional + var err error + + controller := &NonGatewayRouteController{ + connectionHandler: connectionHandler, + remoteSubnets: sets.New[string](), + k8sClientSet: k8sClientSet, + } + + federator := federate.NewUpdateStatusFederator(config.SourceClient, config.RestMapper, corev1.NamespaceAll) + + controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{ + Name: "GatewayRoute syncer", + ResourceType: &submarinerv1.NonGatewayRoute{}, + SourceClient: config.SourceClient, + SourceNamespace: "submariner-operator", + RestMapper: config.RestMapper, + Federator: federator, + Scheme: config.Scheme, + Transform: controller.process, + ResourcesEquivalent: syncer.AreSpecsEquivalent, + }) + + if err != nil { + return nil, errors.Wrap(err, "error creating resource syncer") + } + + nodeName, ok := os.LookupEnv("NODE_NAME") + if !ok { + return nil, errors.New("error getting the Node name") + } + + node, err := controller.k8sClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "error getting the g/w node: %q", nodeName) + } + + annotations := node.GetAnnotations() + + transitSwitchIP, ok := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"] + if !ok { + return nil, errors.Wrapf(err, "No transit switch IP configured") + } + + controller.transitSwitchIP, err = jsonToIP(transitSwitchIP) + if err != nil { + return nil, errors.Wrapf(err, "Error parsing transit switch IP") + } + + err = controller.resourceSyncer.Start(controller.stopCh) + if err != nil { + return nil, errors.Wrapf(err, "error starting non gateway route controller") + } + + logger.Infof("Started NonGatewayRouteController") + + return controller, nil +} + +func (g *NonGatewayRouteController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { + g.mutex.Lock() + defer g.mutex.Unlock() + + submGWRoute := from.(*submarinerv1.NonGatewayRoute) + if submGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP { + if op == syncer.Create || op == syncer.Update { + for _, subnet := range submGWRoute.RoutePolicySpec.RemoteCIDRs { + g.remoteSubnets.Insert(subnet) + } + } else if op == syncer.Delete { + for _, subnet := range submGWRoute.RoutePolicySpec.RemoteCIDRs { + g.remoteSubnets.Delete(subnet) + } + } + + err := g.connectionHandler.ReconcileSubOvnLogicalRouterPolicies(g.remoteSubnets, submGWRoute.RoutePolicySpec.NextHops[0]) + if err != nil { + logger.Errorf(err, "error reconciling router policies for remote subnet %q", g.remoteSubnets) + return nil, true + } + } + + return nil, false +} + +func (g *NonGatewayRouteController) Start() error { + return errors.Wrapf(g.resourceSyncer.Start(g.stopCh), "error starting nongateway route controller") +} + +func (g *NonGatewayRouteController) Stop() { + close(g.stopCh) +} diff --git a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go index c6ffc8d148..ce5d92556a 100644 --- a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go +++ b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go @@ -20,6 +20,7 @@ package ovn import ( "context" + "github.com/submariner-io/submariner/pkg/cni" "sync" "github.com/pkg/errors" @@ -82,9 +83,7 @@ func (h *NonGatewayRouteHandler) GetName() string { } func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string { - // TODO enable when we switch to new implementation - // return []string{cni.OVNKubernetes} - return []string{} + return []string{cni.OVNKubernetes} } func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { @@ -171,7 +170,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo } } -//nolint // These functions are pass-through wrappers for the k8s APIs. +// nolint // These functions are pass-through wrappers for the k8s APIs. func (h *NonGatewayRouteHandler) nonGatewayResourceInterface(namespace string) resource.Interface { return &resource.InterfaceFuncs{ GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) { diff --git a/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go b/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go new file mode 100644 index 0000000000..68c737279b --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go @@ -0,0 +1,128 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "strings" + + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/ptr" +) + +const ( + ovnClusterRouter = "ovn_cluster_router" + ovnRoutePoliciesPrio = 20000 +) + +func (connectionHandler *ConnectionHandler) ReconcileOvnLogicalRouterStaticRoutes(remoteSubnets sets.Set[string], + nextHop string, +) error { + staleLRSRPred := func(item *nbdb.LogicalRouterStaticRoute) bool { + return item.Nexthop == nextHop && !remoteSubnets.Has(item.IPPrefix) + } + + err := libovsdbops.DeleteLogicalRouterStaticRoutesWithPredicate(connectionHandler.nbdb, ovnClusterRouter, staleLRSRPred) + if err != nil { + return errors.Wrapf(err, "Failed to list existing ovn logical route static routes for nexthop: %s", nextHop) + } + + lrsrToAdd := buildLRSRsFromSubnets(remoteSubnets.UnsortedList(), nextHop) + + for _, lrsr := range lrsrToAdd { + LRSRPred := func(item *nbdb.LogicalRouterStaticRoute) bool { + return item.Nexthop == nextHop && item.IPPrefix == lrsr.IPPrefix + } + + err = libovsdbops.CreateOrUpdateLogicalRouterStaticRoutesWithPredicate(connectionHandler.nbdb, ovnClusterRouter, lrsr, LRSRPred) + if err != nil { + return errors.Wrap(err, "Failed to create ovn lrsr and add it to the ovn submariner router") + } + } + + return nil +} + +func buildLRSRsFromSubnets(subnetsToAdd []string, nextHop string) []*nbdb.LogicalRouterStaticRoute { + toAdd := []*nbdb.LogicalRouterStaticRoute{} + + for _, subnet := range subnetsToAdd { + toAdd = append(toAdd, &nbdb.LogicalRouterStaticRoute{ + Nexthop: nextHop, + IPPrefix: subnet, + }) + } + + return toAdd +} + +func (connectionHandler *ConnectionHandler) ReconcileSubOvnLogicalRouterPolicies(remoteSubnets sets.Set[string], nextHop string) error { + lrpStalePredicate := func(item *nbdb.LogicalRouterPolicy) bool { + subnet := strings.Split(item.Match, " ")[2] + + return item.Priority == ovnRoutePoliciesPrio && !remoteSubnets.Has(subnet) + } + + // Cleanup any existing lrps not representing the correct set of remote subnets + err := libovsdbops.DeleteLogicalRouterPoliciesWithPredicate(connectionHandler.nbdb, ovnClusterRouter, lrpStalePredicate) + if err != nil { + return errors.Wrapf(err, "failed to delete stale submariner logical route policies") + } + + expectedLRPs := buildLRPsFromSubnets(remoteSubnets.UnsortedList(), nextHop) + + for _, lrp := range expectedLRPs { + lrpSubPredicate := func(item *nbdb.LogicalRouterPolicy) bool { + subnet1 := strings.Split(item.Match, " ")[2] + subnet2 := strings.Split(lrp.Match, " ")[2] + + return item.Priority == ovnRoutePoliciesPrio && subnet1 == subnet2 + } + + if err := libovsdbops.CreateOrUpdateLogicalRouterPolicyWithPredicate(connectionHandler.nbdb, + ovnClusterRouter, lrp, lrpSubPredicate); err != nil { + return errors.Wrapf(err, "failed to create submariner logical Router policy %v and add it to the ovn cluster router", lrp) + } + } + + return nil +} + +// getNorthSubnetsToAddAndRemove receives the existing state for the north (other clusters) routes in the OVN +// database, and based on the known remote endpoints it will return the elements that need +// to be added and removed. +func buildLRPsFromSubnets(subnetsToAdd []string, nextHop string) []*nbdb.LogicalRouterPolicy { + toAdd := []*nbdb.LogicalRouterPolicy{} + + for _, subnet := range subnetsToAdd { + toAdd = append(toAdd, &nbdb.LogicalRouterPolicy{ + Priority: ovnRoutePoliciesPrio, + Action: "reroute", + Match: "ip4.dst == " + subnet, + Nexthop: ptr.To(nextHop), + ExternalIDs: map[string]string{ + "submariner": "true", + }, + }) + } + + return toAdd +} diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index cd47a0b3f3..652a195ccd 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/log/kzerolog" + "github.com/submariner-io/admiral/pkg/watcher" v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" cni "github.com/submariner-io/submariner/pkg/cni" @@ -39,11 +40,9 @@ import ( cniapi "github.com/submariner-io/submariner/pkg/routeagent_driver/cni" "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" - "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/calico" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/kubeproxy" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/mtu" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn" - "github.com/submariner-io/submariner/pkg/versions" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -62,8 +61,6 @@ func main() { flag.Parse() kzerolog.InitK8sLogging() - versions.Log(&logger) - logger.Info("Starting submariner-route-agent using the event framework") // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler().Done() @@ -99,17 +96,18 @@ func main() { np = cni.Generic } + config := &watcher.Config{RestConfig: cfg} + registry := event.NewRegistry("routeagent_driver", np) if err := registry.AddHandlers( eventlogger.NewHandler(), kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), - ovn.NewHandler(&env, smClientset), - ovn.NewGatewayRouteHandler(&env, smClientset), + ovn.NewHandler(&env, smClientset, k8sClientSet, config), ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet), + ovn.NewGatewayRouteHandler(&env, smClientset), cabledriver.NewXRFMCleanupHandler(), cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(k8sClientSet)), - calico.NewCalicoIPPoolHandler(cfg), ); err != nil { logger.Fatalf("Error registering the handlers: %s", err.Error()) }