diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index d13107ea6..ca7ef3e9d 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -22,6 +22,7 @@ import ( resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/storage" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) @@ -36,11 +37,12 @@ type ClusterSynchro struct { RESTConfig *rest.Config ClusterStatusUpdater ClusterStatusUpdater - storage storage.StorageFactory - syncConfig ClusterSyncConfig - healthChecker *healthChecker - dynamicDiscovery discovery.DynamicDiscoveryInterface - listerWatcherFactory informer.DynamicListerWatcherFactory + storage storage.StorageFactory + resourceSynchroFactory resourcesynchro.SynchroFactory + syncConfig ClusterSyncConfig + healthChecker *healthChecker + dynamicDiscovery discovery.DynamicDiscoveryInterface + listerWatcherFactory informer.DynamicListerWatcherFactory closeOnce sync.Once closer chan struct{} @@ -123,6 +125,12 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}), } + if factory, ok := storage.(resourcesynchro.SynchroFactory); ok { + synchro.resourceSynchroFactory = factory + } else { + synchro.resourceSynchroFactory = DefaultResourceSynchroFactory{} + } + var refresherOnce sync.Once synchro.dynamicDiscovery.Prepare(discovery.PrepareConfig{ ResourceMutationHandler: synchro.resetSyncResources, @@ -167,8 +175,9 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat func (s *ClusterSynchro) GetMetricsWriterList() (writers metricsstore.MetricsWriterList) { s.storageResourceSynchros.Range(func(_, value interface{}) bool { - if synchro := value.(*ResourceSynchro); synchro.metricsWriter != nil { - writers = append(writers, synchro.metricsWriter) + synchro := value.(resourcesynchro.Synchro) + if writer := synchro.GetMetricsWriter(); writer != nil { + writers = append(writers, writer) } return true }) @@ -237,7 +246,7 @@ func (s *ClusterSynchro) Shutdown(updateStatus bool) { shutdownCount := 0 statuses := make(map[string][]string) s.storageResourceSynchros.Range(func(key, value interface{}) bool { - synchro := value.(*ResourceSynchro) + synchro := value.(resourcesynchro.Synchro) status := synchro.Status() if status.Status == clusterv1alpha2.ResourceSyncStatusStop && status.Reason == "" { shutdownCount++ @@ -245,7 +254,7 @@ func (s *ClusterSynchro) Shutdown(updateStatus bool) { } gvr := key.(schema.GroupVersionResource) - sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.runningStage) + sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.Stage()) msg := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource) statuses[sr] = append(statuses[sr], msg) return true @@ -359,18 +368,23 @@ func (s *ClusterSynchro) refreshSyncResources() { if s.syncConfig.MetricsStoreBuilder != nil { metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource) } - synchro := newResourceSynchro(s.name, - ResourceSynchroConfig{ + synchro, err := s.resourceSynchroFactory.NewResourceSynchro(s.name, + resourcesynchro.Config{ GroupVersionResource: config.syncResource, Kind: config.kind, ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource), ObjectConvertor: config.convertor, - ResourceStorage: resourceStorage, MetricsStore: metricsStore, ResourceVersions: rvs, PageSizeForInformer: s.syncConfig.PageSizeForResourceSync, + ResourceStorage: resourceStorage, }, ) + if err != nil { + klog.ErrorS(err, "Failed to create resource synchro", "cluster", s.name, "storage resource", storageGVR) + updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource synchro failed: %s", err)) + continue + } s.waitGroup.StartWithChannel(s.closer, synchro.Run) s.storageResourceSynchros.Store(storageGVR, synchro) @@ -400,7 +414,7 @@ func (s *ClusterSynchro) refreshSyncResources() { for storageGVR := range removedStorageGVRs { if synchro, ok := s.storageResourceSynchros.Load(storageGVR); ok { select { - case <-synchro.(*ResourceSynchro).Close(): + case <-synchro.(resourcesynchro.Synchro).Close(): case <-s.closer: return } @@ -475,7 +489,7 @@ func (s *ClusterSynchro) runner() { go s.dynamicDiscovery.Start(s.handlerStopCh) s.storageResourceSynchros.Range(func(_, value interface{}) bool { - go value.(*ResourceSynchro).Start(s.handlerStopCh) + go value.(resourcesynchro.Synchro).Start(s.handlerStopCh) return true }) }() @@ -542,12 +556,13 @@ func (s *ClusterSynchro) genClusterStatus() *clusterv1alpha2.ClusterStatus { gr := schema.GroupResource{Group: status.Group, Resource: resource.Name} storageGVR := cond.StorageGVR(gr) if value, ok := s.storageResourceSynchros.Load(storageGVR); ok { - synchro := value.(*ResourceSynchro) - if gr != synchro.syncResource.GroupResource() { - cond.SyncResource = synchro.syncResource.GroupResource().String() + synchro := value.(resourcesynchro.Synchro) + syncedGVR := synchro.GroupVersionResource() + if gr != syncedGVR.GroupResource() { + cond.SyncResource = syncedGVR.GroupResource().String() } - if cond.Version != synchro.syncResource.Version { - cond.SyncVersion = synchro.syncResource.Version + if cond.Version != syncedGVR.Version { + cond.SyncVersion = syncedGVR.Version } status := synchro.Status() diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/default_resource_synchro.go similarity index 87% rename from pkg/synchromanager/clustersynchro/resource_synchro.go rename to pkg/synchromanager/clustersynchro/default_resource_synchro.go index 8f92f4e74..b6e8200e4 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/default_resource_synchro.go @@ -19,34 +19,16 @@ import ( metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" - kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/informer" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/queue" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/queue" "github.com/clusterpedia-io/clusterpedia/pkg/utils" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) -type ResourceSynchroConfig struct { - schema.GroupVersionResource - Kind string - - cache.ListerWatcher - runtime.ObjectConvertor - storage.ResourceStorage - - *kubestatemetrics.MetricsStore - - ResourceVersions map[string]interface{} - PageSizeForInformer int64 -} - -func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind { - return c.GroupVersionResource.GroupVersion().WithKind(c.Kind) -} - -type ResourceSynchro struct { +type resourceSynchro struct { cluster string example runtime.Object @@ -88,9 +70,13 @@ type ResourceSynchro struct { runningStage string } -func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro { +type DefaultResourceSynchroFactory struct{} + +var _ resourcesynchro.SynchroFactory = DefaultResourceSynchroFactory{} + +func (factory DefaultResourceSynchroFactory) NewResourceSynchro(cluster string, config resourcesynchro.Config) (resourcesynchro.Synchro, error) { storageConfig := config.ResourceStorage.GetStorageConfig() - synchro := &ResourceSynchro{ + synchro := &resourceSynchro{ cluster: cluster, syncResource: config.GroupVersionResource, storageResource: storageConfig.StorageResource, @@ -127,10 +113,26 @@ func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceS } synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "") - return synchro + return synchro, nil +} + +func (synchro *resourceSynchro) Stage() string { + return synchro.runningStage +} + +func (synchro *resourceSynchro) GroupVersionResource() schema.GroupVersionResource { + return synchro.syncResource +} + +func (synchro *resourceSynchro) StoragedGroupVersionResource() schema.GroupVersionResource { + return synchro.storageResource +} + +func (synchro *resourceSynchro) GetMetricsWriter() *metricsstore.MetricsWriter { + return synchro.metricsWriter } -func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) { +func (synchro *resourceSynchro) Run(shutdown <-chan struct{}) { defer close(synchro.closed) go func() { select { @@ -158,7 +160,7 @@ func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) { synchro.runningStage = "shutdown" } -func (synchro *ResourceSynchro) Close() <-chan struct{} { +func (synchro *resourceSynchro) Close() <-chan struct{} { synchro.closeOnce.Do(func() { close(synchro.closer) synchro.queue.Close() @@ -167,7 +169,7 @@ func (synchro *ResourceSynchro) Close() <-chan struct{} { return synchro.closed } -func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) { +func (synchro *resourceSynchro) Start(stopCh <-chan struct{}) { synchro.startlock.Lock() stopped := synchro.stopped // avoid race synchro.startlock.Unlock() @@ -288,7 +290,7 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) { const LastAppliedConfigurationAnnotation = "kubectl.kubernetes.io/last-applied-configuration" -func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) { +func (synchro *resourceSynchro) pruneObject(obj *unstructured.Unstructured) { if clusterpediafeature.FeatureGate.Enabled(features.PruneManagedFields) { obj.SetManagedFields(nil) } @@ -305,7 +307,7 @@ func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) { } } -func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) { +func (synchro *resourceSynchro) OnAdd(obj interface{}, isInInitialList bool) { if !synchro.isRunnableForStorage.Load() { return } @@ -327,7 +329,7 @@ func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) { _ = synchro.queue.Add(obj) } -func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) { +func (synchro *resourceSynchro) OnUpdate(_, obj interface{}) { if !synchro.isRunnableForStorage.Load() { return } @@ -342,7 +344,7 @@ func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) { _ = synchro.queue.Update(obj) } -func (synchro *ResourceSynchro) OnDelete(obj interface{}) { +func (synchro *resourceSynchro) OnDelete(obj interface{}) { if !synchro.isRunnableForStorage.Load() { return } @@ -357,9 +359,9 @@ func (synchro *ResourceSynchro) OnDelete(obj interface{}) { _ = synchro.queue.Delete(obj) } -func (synchro *ResourceSynchro) OnSync(obj interface{}) {} +func (synchro *resourceSynchro) OnSync(obj interface{}) {} -func (synchro *ResourceSynchro) processResources() { +func (synchro *resourceSynchro) processResources() { for { select { case <-synchro.closer: @@ -381,7 +383,7 @@ func (synchro *ResourceSynchro) processResources() { } } -func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) { +func (synchro *resourceSynchro) handleResourceEvent(event *queue.Event) { defer func() { _ = synchro.queue.Done(event) }() obj, ok := event.Object.(runtime.Object) @@ -476,7 +478,7 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) { } } -func (synchro *ResourceSynchro) setRunnableForStorage() { +func (synchro *resourceSynchro) setRunnableForStorage() { synchro.isRunnableForStorage.Store(true) synchro.forStorageLock.Lock() @@ -494,7 +496,7 @@ func (synchro *ResourceSynchro) setRunnableForStorage() { } } -func (synchro *ResourceSynchro) setStopForStorage() { +func (synchro *resourceSynchro) setStopForStorage() { synchro.isRunnableForStorage.Store(false) synchro.forStorageLock.Lock() @@ -512,7 +514,7 @@ func (synchro *ResourceSynchro) setStopForStorage() { } } -func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) { +func (synchro *resourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) { if synchro.syncResource == synchro.storageResource || synchro.convertor == nil { return obj, nil } @@ -535,7 +537,7 @@ func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (run return obj, nil } -func (synchro *ResourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error { +func (synchro *resourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error { err := synchro.storage.Create(ctx, synchro.cluster, obj) if genericstorage.IsExist(err) { return synchro.storage.Update(ctx, synchro.cluster, obj) @@ -543,7 +545,7 @@ func (synchro *ResourceSynchro) createOrUpdateResource(ctx context.Context, obj return err } -func (synchro *ResourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error { +func (synchro *resourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error { err := synchro.storage.Update(ctx, synchro.cluster, obj) if genericstorage.IsNotFound(err) { return synchro.storage.Create(ctx, synchro.cluster, obj) @@ -551,11 +553,11 @@ func (synchro *ResourceSynchro) updateOrCreateResource(ctx context.Context, obj return err } -func (synchro *ResourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error { +func (synchro *resourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error { return synchro.storage.Delete(ctx, synchro.cluster, obj) } -func (synchro *ResourceSynchro) setStatus(status string, reason, message string) { +func (synchro *resourceSynchro) setStatus(status string, reason, message string) { synchro.status.Store(clusterv1alpha2.ClusterResourceSyncCondition{ Status: status, Reason: reason, @@ -564,11 +566,11 @@ func (synchro *ResourceSynchro) setStatus(status string, reason, message string) }) } -func (synchro *ResourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition { +func (synchro *resourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition { return synchro.status.Load().(clusterv1alpha2.ClusterResourceSyncCondition) } -func (synchro *ResourceSynchro) ErrorHandler(r *informer.Reflector, err error) { +func (synchro *resourceSynchro) ErrorHandler(r *informer.Reflector, err error) { if err != nil { // TODO(iceber): Use `k8s.io/apimachinery/pkg/api/errors` to resolve the error type and update it to `status.Reason` synchro.setStatus(clusterv1alpha2.ResourceSyncStatusError, "ResourceWatchFailed", err.Error()) diff --git a/pkg/synchromanager/clustersynchro/queue/event.go b/pkg/synchromanager/resourcesynchro/queue/event.go similarity index 100% rename from pkg/synchromanager/clustersynchro/queue/event.go rename to pkg/synchromanager/resourcesynchro/queue/event.go diff --git a/pkg/synchromanager/clustersynchro/queue/event_test.go b/pkg/synchromanager/resourcesynchro/queue/event_test.go similarity index 100% rename from pkg/synchromanager/clustersynchro/queue/event_test.go rename to pkg/synchromanager/resourcesynchro/queue/event_test.go diff --git a/pkg/synchromanager/clustersynchro/queue/pressurequeue.go b/pkg/synchromanager/resourcesynchro/queue/pressurequeue.go similarity index 100% rename from pkg/synchromanager/clustersynchro/queue/pressurequeue.go rename to pkg/synchromanager/resourcesynchro/queue/pressurequeue.go diff --git a/pkg/synchromanager/clustersynchro/queue/queue.go b/pkg/synchromanager/resourcesynchro/queue/queue.go similarity index 100% rename from pkg/synchromanager/clustersynchro/queue/queue.go rename to pkg/synchromanager/resourcesynchro/queue/queue.go diff --git a/pkg/synchromanager/resourcesynchro/resource_synchro.go b/pkg/synchromanager/resourcesynchro/resource_synchro.go new file mode 100644 index 000000000..fc2c7f11c --- /dev/null +++ b/pkg/synchromanager/resourcesynchro/resource_synchro.go @@ -0,0 +1,49 @@ +package resourcesynchro + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" + + clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" + kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" +) + +type SynchroFactory interface { + NewResourceSynchro(name string, config Config) (Synchro, error) +} + +type Synchro interface { + Run(shutdown <-chan struct{}) + Start(stopCh <-chan struct{}) + Close() <-chan struct{} + + GroupVersionResource() schema.GroupVersionResource + StoragedGroupVersionResource() schema.GroupVersionResource + + GetMetricsWriter() *metricsstore.MetricsWriter + + // TODO(iceber): Provide a more meaningful name for this method + Stage() string + Status() clusterv1alpha2.ClusterResourceSyncCondition +} + +type Config struct { + schema.GroupVersionResource + Kind string + + ListerWatcher cache.ListerWatcher + ObjectConvertor runtime.ObjectConvertor + + ResourceVersions map[string]interface{} + PageSizeForInformer int64 + + MetricsStore *kubestatemetrics.MetricsStore + ResourceStorage storage.ResourceStorage +} + +func (c Config) GroupVersionKind() schema.GroupVersionKind { + return c.GroupVersionResource.GroupVersion().WithKind(c.Kind) +}