From bdd40728372d51464fbdcf1de7ee6e71192f7e40 Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Mon, 24 Jun 2024 11:15:23 +0800 Subject: [PATCH] replace storage.ResourceStorageConfig with ResourceConfig Signed-off-by: Iceber Gu --- .../clusterpedia/collectionresources/rest.go | 10 +- pkg/kube_state_metrics/metrics_store.go | 17 ++- pkg/kubeapiserver/restmanager.go | 24 ++-- .../internalstorage/resource_storage.go | 89 ++++++------- .../internalstorage/resource_storage_test.go | 21 +-- pkg/storage/internalstorage/storage.go | 8 +- .../memorystorage/memory_resource_storage.go | 7 +- pkg/storage/memorystorage/memory_storage.go | 8 +- pkg/storage/storage.go | 11 +- pkg/storageconfig/storageconfig_factory.go | 123 ------------------ .../clustersynchro/cluster_synchro.go | 6 +- .../clustersynchro/resource_negotiator.go | 28 ++-- .../clustersynchro/resource_synchro.go | 4 +- 13 files changed, 108 insertions(+), 248 deletions(-) delete mode 100644 pkg/storageconfig/storageconfig_factory.go diff --git a/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go b/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go index 0e52bee3a..79a4c4b87 100644 --- a/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go +++ b/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go @@ -20,8 +20,8 @@ import ( internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/api/clusterpedia/scheme" "github.com/clusterpedia-io/api/clusterpedia/v1beta1" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" "github.com/clusterpedia-io/clusterpedia/pkg/utils" "github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation" "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" @@ -48,7 +48,7 @@ func NewREST(serializer runtime.NegotiatedSerializer, factory storage.StorageFac list := &internal.CollectionResourceList{} storages := make(map[string]storage.CollectionResourceStorage, len(crs)) - configFactory := storageconfig.NewStorageConfigFactory() + configFactory := resourceconfigfactory.New() for _, cr := range crs { for irt := range cr.ResourceTypes { rt := &cr.ResourceTypes[irt] @@ -59,9 +59,9 @@ func NewREST(serializer runtime.NegotiatedSerializer, factory storage.StorageFac } *rt = internal.CollectionResourceType{ - Group: config.StorageGroupResource.Group, - Version: config.StorageVersion.Version, - Resource: config.StorageGroupResource.Resource, + Group: config.StorageResource.Group, + Version: config.StorageResource.Version, + Resource: config.StorageResource.Resource, } } } diff --git a/pkg/kube_state_metrics/metrics_store.go b/pkg/kube_state_metrics/metrics_store.go index 34d0b3b62..f7f0633ce 100644 --- a/pkg/kube_state_metrics/metrics_store.go +++ b/pkg/kube_state_metrics/metrics_store.go @@ -16,22 +16,22 @@ import ( "k8s.io/kube-state-metrics/v2/pkg/optin" "k8s.io/kube-state-metrics/v2/pkg/options" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" ) var ( - storageConfigFactory = storageconfig.NewStorageConfigFactory() - hubGVRs = make(map[schema.GroupVersionResource]schema.GroupVersionResource) + resourceConfigFactory = resourceconfigfactory.New() + hubGVRs = make(map[schema.GroupVersionResource]schema.GroupVersionResource) ) func init() { for gvr := range generators { - config, err := storageConfigFactory.NewLegacyResourceConfig(gvr.GroupResource(), false) + memory, err := resourceConfigFactory.MemoryResource(gvr) if err != nil { panic(err) } - hubGVRs[config.StorageGroupResource.WithVersion(config.MemoryVersion.Version)] = gvr + hubGVRs[memory] = gvr } } @@ -101,11 +101,10 @@ func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource sche return nil } - config, err := storageConfigFactory.NewLegacyResourceConfig(resource.GroupResource(), false) + hub, err := resourceConfigFactory.MemoryResource(resource) if err != nil { return nil } - hub := config.StorageGroupResource.WithVersion(config.MemoryVersion.Version) metricsGVR, ok := hubGVRs[hub] if !ok { return nil @@ -134,11 +133,11 @@ func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource sche return obj, nil } - hobj, err := scheme.LegacyResourceScheme.ConvertToVersion(obj.(runtime.Object), config.MemoryVersion) + hobj, err := scheme.LegacyResourceScheme.ConvertToVersion(obj.(runtime.Object), hub.GroupVersion()) if err != nil { return nil, err } - if metricsGVR.GroupVersion() == config.MemoryVersion { + if metricsGVR.GroupVersion() == hub.GroupVersion() { return hobj, nil } return scheme.LegacyResourceScheme.ConvertToVersion(hobj, metricsGVR.GroupVersion()) diff --git a/pkg/kubeapiserver/restmanager.go b/pkg/kubeapiserver/restmanager.go index 7da80fe37..83aa56a57 100644 --- a/pkg/kubeapiserver/restmanager.go +++ b/pkg/kubeapiserver/restmanager.go @@ -28,16 +28,16 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme" unstructuredscheme "github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme/unstructured" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" ) type RESTManager struct { serializer runtime.NegotiatedSerializer storageFactory storage.StorageFactory - resourcetSorageConfig *storageconfig.StorageConfigFactory + resourceConfigFactory *resourceconfigfactory.ResourceConfigFactory equivalentResourceRegistry runtime.EquivalentResourceMapper lock sync.Mutex @@ -87,7 +87,7 @@ func NewRESTManager(serializer runtime.NegotiatedSerializer, storageMediaType st manager := &RESTManager{ serializer: serializer, storageFactory: storageFactory, - resourcetSorageConfig: storageconfig.NewStorageConfigFactory(), + resourceConfigFactory: resourceconfigfactory.New(), equivalentResourceRegistry: runtime.NewEquivalentResourceRegistry(), requestVerbs: requestVerbs, } @@ -270,12 +270,13 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers } func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string, namespaced bool) (*resourcerest.RESTStorage, error) { - storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource(), namespaced) + resourceConfig, err := m.resourceConfigFactory.NewLegacyResourceConfig(gvr.GroupResource(), namespaced) if err != nil { return nil, err } - resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) + config := &storage.ResourceStorageConfig{ResourceConfig: *resourceConfig} + resourceStorage, err := m.storageFactory.NewResourceStorage(config) if err != nil { return nil, err } @@ -284,11 +285,11 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour DefaultQualifiedResource: gvr.GroupResource(), NewFunc: func() runtime.Object { - obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryVersion.WithKind(kind)) + obj, _ := scheme.LegacyResourceScheme.New(resourceConfig.MemoryResource.GroupVersion().WithKind(kind)) return obj }, NewListFunc: func() runtime.Object { - obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryVersion.WithKind(kind + "List")) + obj, _ := scheme.LegacyResourceScheme.New(resourceConfig.MemoryResource.GroupVersion().WithKind(kind + "List")) return obj }, @@ -297,12 +298,13 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour } func (m *RESTManager) genUnstructuredRESTStorage(gvr schema.GroupVersionResource, kind string, namespaced bool) (*resourcerest.RESTStorage, error) { - storageConfig, err := m.resourcetSorageConfig.NewUnstructuredConfig(gvr, namespaced) + resourceConfig, err := m.resourceConfigFactory.NewUnstructuredConfig(gvr, namespaced) if err != nil { return nil, err } - resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) + config := &storage.ResourceStorageConfig{ResourceConfig: *resourceConfig} + resourceStorage, err := m.storageFactory.NewResourceStorage(config) if err != nil { return nil, err } @@ -310,12 +312,12 @@ func (m *RESTManager) genUnstructuredRESTStorage(gvr schema.GroupVersionResource return &resourcerest.RESTStorage{ NewFunc: func() runtime.Object { obj := &unstructured.Unstructured{} - obj.SetGroupVersionKind(storageConfig.MemoryVersion.WithKind(kind)) + obj.SetGroupVersionKind(resourceConfig.MemoryResource.GroupVersion().WithKind(kind)) return obj }, NewListFunc: func() runtime.Object { obj := &unstructured.UnstructuredList{} - obj.SetGroupVersionKind(storageConfig.MemoryVersion.WithKind(kind + "List")) + obj.SetGroupVersionKind(resourceConfig.MemoryResource.GroupVersion().WithKind(kind + "List")) return obj }, diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index a42f758c6..b59e18d48 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -27,20 +27,33 @@ import ( ) type ResourceStorage struct { - db *gorm.DB - codec runtime.Codec + groupResource schema.GroupResource - storageGroupResource schema.GroupResource - storageVersion schema.GroupVersion - memoryVersion schema.GroupVersion + db *gorm.DB + config storage.ResourceStorageConfig } func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { - return &storage.ResourceStorageConfig{ - Codec: s.codec, - StorageGroupResource: s.storageGroupResource, - StorageVersion: s.storageVersion, - MemoryVersion: s.memoryVersion, + config := s.config + return &config +} + +func (s *ResourceStorage) gvrKeyMap() map[string]interface{} { + return map[string]interface{}{ + "group": s.config.StorageResource.Group, + "version": s.config.StorageResource.Version, + "resource": s.config.StorageResource.Resource, + } +} + +func (s *ResourceStorage) resourceKeyMap(cluster, namespace, name string) map[string]interface{} { + return map[string]interface{}{ + "cluster": cluster, + "group": s.config.StorageResource.Group, + "version": s.config.StorageResource.Version, + "resource": s.config.StorageResource.Resource, + "namespace": namespace, + "name": name, } } @@ -61,7 +74,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim } var buffer bytes.Buffer - if err := s.codec.Encode(obj, &buffer); err != nil { + if err := s.config.Codec.Encode(obj, &buffer); err != nil { return err } @@ -71,9 +84,9 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim UID: metaobj.GetUID(), Name: metaobj.GetName(), Namespace: metaobj.GetNamespace(), - Group: s.storageGroupResource.Group, - Resource: s.storageGroupResource.Resource, - Version: s.storageVersion.Version, + Group: s.config.StorageResource.Group, + Resource: s.config.StorageResource.Resource, + Version: s.config.StorageResource.Version, Kind: gvk.Kind, ResourceVersion: metaobj.GetResourceVersion(), Object: buffer.Bytes(), @@ -94,7 +107,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim } var buffer bytes.Buffer - if err := s.codec.Encode(obj, &buffer); err != nil { + if err := s.config.Codec.Encode(obj, &buffer); err != nil { return err } @@ -116,14 +129,9 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true} } - result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": metaobj.GetNamespace(), - "name": metaobj.GetName(), - }).Updates(updatedResource) + result := s.db.WithContext(ctx).Model(&Resource{}). + Where(s.resourceKeyMap(cluster, metaobj.GetNamespace(), metaobj.GetName())). + Updates(updatedResource) return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error) } @@ -144,14 +152,7 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object, } func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB { - return s.db.Model(&Resource{}).Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": namespace, - "name": name, - }).Delete(&Resource{}) + return s.db.Model(&Resource{}).Where(s.resourceKeyMap(cluster, namespace, name)).Delete(&Resource{}) } func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { @@ -167,14 +168,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim } func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB { - return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": namespace, - "name": name, - }) + return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(s.resourceKeyMap(cluster, namespace, name)) } func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { @@ -183,7 +177,7 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error) } - obj, _, err := s.codec.Decode(objects[0], nil, into) + obj, _, err := s.config.Codec.Decode(objects[0], nil, into) if err != nil { return err } @@ -199,12 +193,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna result = &ResourceMetadataList{} } - query := s.db.WithContext(ctx).Model(&Resource{}) - query = query.Where(map[string]interface{}{ - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - }) + query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap()) offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts) return offset, amount, query, result, err } @@ -216,7 +205,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o } if err := result.From(query); err != nil { - return InterpretDBError(s.storageGroupResource.String(), err) + return InterpretDBError(s.groupResource.String(), err) } objects := result.Items() @@ -246,7 +235,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects)) for _, object := range objects { uObj := &unstructured.Unstructured{} - obj, err := object.ConvertTo(s.codec, uObj) + obj, err := object.ConvertTo(s.config.Codec, uObj) if err != nil { return err } @@ -283,7 +272,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o slice := reflect.MakeSlice(v.Type(), len(objects), len(objects)) expected := reflect.New(v.Type().Elem()).Interface().(runtime.Object) for i, object := range objects { - obj, err := object.ConvertTo(s.codec, expected.DeepCopyObject()) + obj, err := object.ConvertTo(s.config.Codec, expected.DeepCopyObject()) if err != nil { return err } @@ -294,7 +283,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o } func (s *ResourceStorage) Watch(_ context.Context, _ *internal.ListOptions) (watch.Interface, error) { - return nil, apierrors.NewMethodNotSupported(s.storageGroupResource, "watch") + return nil, apierrors.NewMethodNotSupported(s.groupResource, "watch") } func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) { diff --git a/pkg/storage/internalstorage/resource_storage_test.go b/pkg/storage/internalstorage/resource_storage_test.go index 4e44168d9..0b1407324 100644 --- a/pkg/storage/internalstorage/resource_storage_test.go +++ b/pkg/storage/internalstorage/resource_storage_test.go @@ -16,7 +16,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" + "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" ) func testApplyListOptionsToResourceQuery(t *testing.T, name string, options *internal.ListOptions, expected expected) { @@ -376,14 +378,14 @@ func TestResourceStorage_Update(t *testing.T) { rs := newTestResourceStorage(db, appsv1.SchemeGroupVersion.WithResource("deployments")) - factory := storageconfig.NewStorageConfigFactory() + factory := resourceconfigfactory.New() require.NotNil(factory) config, err := factory.NewLegacyResourceConfig(schema.GroupResource{Group: appsv1.SchemeGroupVersion.Group, Resource: "deployments"}, true) require.NoError(err) require.NotNil(config) - rs.codec = config.Codec + rs.config = storage.ResourceStorageConfig{ResourceConfig: *config} trueRef := true obj := &appsv1.Deployment{ @@ -414,7 +416,7 @@ func TestResourceStorage_Update(t *testing.T) { require.Len(ownerRef, 1) var buffer bytes.Buffer - err = rs.codec.Encode(obj, &buffer) + err = rs.config.Codec.Encode(obj, &buffer) require.NoError(err) owner := metav1.GetControllerOfNoCopy(metaObj) @@ -463,10 +465,13 @@ func TestResourceStorage_Update(t *testing.T) { assert.NotEqual(resourcesAfterUpdates[0].Object, resourcesAfterCreation[0].Object) } -func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage { +func newTestResourceStorage(db *gorm.DB, storageResource schema.GroupVersionResource) *ResourceStorage { return &ResourceStorage{ - db: db, - storageGroupResource: storageGVK.GroupResource(), - storageVersion: storageGVK.GroupVersion(), + db: db, + config: storage.ResourceStorageConfig{ + ResourceConfig: resourceconfig.ResourceConfig{ + StorageResource: storageResource, + }, + }, } } diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index ce99ca99d..6bdc005a7 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -21,12 +21,10 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string { func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { return &ResourceStorage{ - db: s.db, - codec: config.Codec, + groupResource: config.StorageResource.GroupResource(), - storageGroupResource: config.StorageGroupResource, - storageVersion: config.StorageVersion, - memoryVersion: config.MemoryVersion, + db: s.db, + config: *config, }, nil } diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index 6f7609c75..cacfb1e99 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -50,6 +50,7 @@ type ResourceStorage struct { CrvSynchro *cache.ClusterResourceVersionSynchro incoming chan ClusterWatchEvent storageConfig *storage.ResourceStorageConfig + memoryVersion schema.GroupVersion } func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { @@ -62,7 +63,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim return err } - err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) } @@ -76,7 +77,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim return err } - err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) } @@ -108,7 +109,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim return err } - err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) } diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index 77209f538..0938d9162 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -22,12 +22,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi storages.Lock() defer storages.Unlock() - gvr := schema.GroupVersionResource{ - Group: config.GroupResource.Group, - Version: config.StorageVersion.Version, - Resource: config.GroupResource.Resource, - } - + gvr := config.StorageResource resourceStorage, ok := storages.resourceStorages[gvr] if ok { watchCache := resourceStorage.watchCache @@ -42,6 +37,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi Codec: config.Codec, watchCache: watchCache, storageConfig: config, + memoryVersion: config.MemoryResource.GroupVersion(), } storages.resourceStorages[gvr] = resourceStorage diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a8df23fcf..e16e75e37 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -8,6 +8,7 @@ import ( "k8s.io/apimachinery/pkg/watch" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig" ) type StorageFactory interface { @@ -46,15 +47,7 @@ type CollectionResourceStorage interface { } type ResourceStorageConfig struct { - Namespaced bool - - GroupResource schema.GroupResource - StorageGroupResource schema.GroupResource - - MemoryVersion schema.GroupVersion - StorageVersion schema.GroupVersion - - Codec runtime.Codec + resourceconfig.ResourceConfig } type storageRecoverableExceptionError struct { diff --git a/pkg/storageconfig/storageconfig_factory.go b/pkg/storageconfig/storageconfig_factory.go deleted file mode 100644 index f2d477729..000000000 --- a/pkg/storageconfig/storageconfig_factory.go +++ /dev/null @@ -1,123 +0,0 @@ -package storageconfig - -import ( - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer/versioning" - "k8s.io/apiserver/pkg/server/resourceconfig" - serverstorage "k8s.io/apiserver/pkg/server/storage" - apisstorage "k8s.io/kubernetes/pkg/apis/storage" - - "github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme" - "github.com/clusterpedia-io/clusterpedia/pkg/storage" -) - -type StorageConfigFactory struct { - legacyResourceEncodingConfig serverstorage.ResourceEncodingConfig - - cohabitatingResources map[schema.GroupResource][]schema.GroupResource -} - -func NewStorageConfigFactory() *StorageConfigFactory { - resources := []schema.GroupVersionResource{ - apisstorage.Resource("csistoragecapacities").WithVersion("v1beta1"), - } - - resourceEncodingConfig := serverstorage.NewDefaultResourceEncodingConfig(scheme.LegacyResourceScheme) - resourceEncodingConfig = resourceconfig.MergeResourceEncodingConfigs(resourceEncodingConfig, resources) - - factory := &StorageConfigFactory{ - legacyResourceEncodingConfig: resourceEncodingConfig, - cohabitatingResources: make(map[schema.GroupResource][]schema.GroupResource), - } - - /* - factory.addCohabitatingResources(networking.Resource("networkpolicies"), extensions.Resource("networkpolicies")) - factory.addCohabitatingResources(apps.Resource("deployments"), extensions.Resource("deployments")) - factory.addCohabitatingResources(apps.Resource("daemonsets"), extensions.Resource("daemonsets")) - factory.addCohabitatingResources(apps.Resource("replicasets"), extensions.Resource("replicasets")) - factory.addCohabitatingResources(apicore.Resource("events"), events.Resource("events")) - factory.addCohabitatingResources(apicore.Resource("replicationcontrollers"), extensions.Resource("replicationcontrollers")) // to make scale subresources equivalent - factory.addCohabitatingResources(policy.Resource("podsecuritypolicies"), extensions.Resource("podsecuritypolicies")) - factory.addCohabitatingResources(networking.Resource("ingresses"), extensions.Resource("ingresses")) - */ - return factory -} - -/* -func (f *StorageConfigFactory) addCohabitatingResources(groupResources ...schema.GroupResource) { - for _, groupResource := range groupResources { - f.cohabitatingResources[groupResource] = groupResources - } -} -*/ - -func (g *StorageConfigFactory) GetStorageGroupResource(groupResource schema.GroupResource) schema.GroupResource { - if len(g.cohabitatingResources[groupResource]) != 0 { - return g.cohabitatingResources[groupResource][0] - } - return groupResource -} - -func (g *StorageConfigFactory) NewConfig(gvr schema.GroupVersionResource, namespaced bool) (*storage.ResourceStorageConfig, error) { - if scheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - return g.NewLegacyResourceConfig(gvr.GroupResource(), namespaced) - } - return g.NewUnstructuredConfig(gvr, namespaced) -} - -func (g *StorageConfigFactory) NewUnstructuredConfig(gvr schema.GroupVersionResource, namespaced bool) (*storage.ResourceStorageConfig, error) { - version := gvr.GroupVersion() - codec := versioning.NewCodec( - scheme.UnstructuredCodecs, - scheme.UnstructuredCodecs, - scheme.UnstructuredScheme, - scheme.UnstructuredScheme, - scheme.UnstructuredScheme, - nil, - version, - version, - "unstructuredObjectStorage", - ) - return &storage.ResourceStorageConfig{ - GroupResource: gvr.GroupResource(), - StorageGroupResource: gvr.GroupResource(), - Codec: codec, - StorageVersion: version, - MemoryVersion: version, - Namespaced: namespaced, - }, nil -} - -func (g *StorageConfigFactory) NewLegacyResourceConfig(gr schema.GroupResource, namespaced bool) (*storage.ResourceStorageConfig, error) { - chosenStorageResource := g.GetStorageGroupResource(gr) - - storageVersion, err := g.legacyResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) - if err != nil { - return nil, err - } - memoryVersion, err := g.legacyResourceEncodingConfig.InMemoryEncodingFor(gr) - if err != nil { - return nil, err - } - - codecConfig := serverstorage.StorageCodecConfig{ - StorageMediaType: runtime.ContentTypeJSON, - StorageSerializer: scheme.LegacyResourceCodecs, - MemoryVersion: memoryVersion, - StorageVersion: storageVersion, - } - codec, _, err := serverstorage.NewStorageCodec(codecConfig) - if err != nil { - return nil, err - } - - return &storage.ResourceStorageConfig{ - GroupResource: gr, - StorageGroupResource: chosenStorageResource, - Codec: codec, - StorageVersion: codecConfig.StorageVersion, - MemoryVersion: memoryVersion, - Namespaced: namespaced, - }, nil -} diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 09f82f387..d13107ea6 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -19,8 +19,8 @@ import ( kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/discovery" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/informer" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) @@ -135,7 +135,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat synchro.resourceNegotiator = &ResourceNegotiator{ name: name, - resourceStorageConfig: storageconfig.NewStorageConfigFactory(), + resourceConfigFactory: resourceconfigfactory.New(), dynamicDiscovery: synchro.dynamicDiscovery, } synchro.groupResourceStatus.Store((*GroupResourceStatus)(nil)) @@ -342,7 +342,7 @@ func (s *ClusterSynchro) refreshSyncResources() { continue } - resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig) + resourceStorage, err := s.storage.NewResourceStorage(config.resourceStorageConfig) if err != nil { klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", storageGVR) updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource storage failed: %s", err)) diff --git a/pkg/synchromanager/clustersynchro/resource_negotiator.go b/pkg/synchromanager/clustersynchro/resource_negotiator.go index 4f41c6d70..11c6e1f24 100644 --- a/pkg/synchromanager/clustersynchro/resource_negotiator.go +++ b/pkg/synchromanager/clustersynchro/resource_negotiator.go @@ -14,9 +14,9 @@ import ( clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/discovery" + resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory" "github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) @@ -24,15 +24,15 @@ import ( type ResourceNegotiator struct { name string dynamicDiscovery discovery.DynamicDiscoveryInterface - resourceStorageConfig *storageconfig.StorageConfigFactory + resourceConfigFactory *resourceconfigfactory.ResourceConfigFactory syncAllCustomResources bool } type syncConfig struct { - kind string - syncResource schema.GroupVersionResource - convertor runtime.ObjectConvertor - storageConfig *storage.ResourceStorageConfig + kind string + syncResource schema.GroupVersionResource + convertor runtime.ObjectConvertor + resourceStorageConfig *storage.ResourceStorageConfig } func (negotiator *ResourceNegotiator) SetSyncAllCustomResources(sync bool) { @@ -124,7 +124,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu Reason: "SynchroCreating", } - storageConfig, err := negotiator.resourceStorageConfig.NewConfig(syncGVR, apiResource.Namespaced) + resourceConfig, err := negotiator.resourceConfigFactory.NewConfig(syncGVR, apiResource.Namespaced) if err != nil { syncCondition.Reason = "SynchroCreateFailed" syncCondition.Message = fmt.Sprintf("new resource storage config failed: %s", err) @@ -132,10 +132,10 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu continue } - storageGVR := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version) + storageGVR := resourceConfig.StorageResource syncCondition.StorageVersion = storageGVR.Version - if syncGR != storageConfig.StorageGroupResource { - syncCondition.StorageResource = storageConfig.StorageGroupResource.String() + if syncGR != storageGVR.GroupResource() { + syncCondition.StorageResource = storageGVR.GroupResource().String() } groupResourceStatus.addSyncCondition(syncGVR, syncCondition) @@ -151,10 +151,10 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu convertor = scheme.UnstructuredScheme } storageResourceSyncConfigs[storageGVR] = syncConfig{ - kind: apiResource.Kind, - syncResource: syncGVR, - storageConfig: storageConfig, - convertor: convertor, + kind: apiResource.Kind, + syncResource: syncGVR, + resourceStorageConfig: &storage.ResourceStorageConfig{ResourceConfig: *resourceConfig}, + convertor: convertor, } } } diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index 403a0b50a..8f92f4e74 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -93,7 +93,7 @@ func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceS synchro := &ResourceSynchro{ cluster: cluster, syncResource: config.GroupVersionResource, - storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version), + storageResource: storageConfig.StorageResource, pageSize: config.PageSizeForInformer, listerWatcher: config.ListerWatcher, @@ -104,7 +104,7 @@ func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceS storage: config.ResourceStorage, convertor: config.ObjectConvertor, - memoryVersion: storageConfig.MemoryVersion, + memoryVersion: storageConfig.MemoryResource.GroupVersion(), stopped: make(chan struct{}), isRunnableForStorage: atomic.NewBool(true),