Skip to content

Commit

Permalink
support for storage to implement custom resource synchro
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <caiwei95@hotmail.com>
  • Loading branch information
Iceber committed Jun 18, 2024
1 parent 67c7ac1 commit e402630
Show file tree
Hide file tree
Showing 21 changed files with 130 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/dependentresource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"k8s.io/klog/v2"

policyv1alpha1 "github.com/clusterpedia-io/api/policy/v1alpha1"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/informer"
)

type DependentResourceManager struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/discovery/controller/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"k8s.io/klog/v2"

"github.com/clusterpedia-io/clusterpedia/pkg/scheme"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/informer"
)

type CRDController struct {
Expand Down
55 changes: 35 additions & 20 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/storageconfig"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/informer"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

Expand All @@ -36,11 +37,12 @@ type ClusterSynchro struct {
RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
storage storage.StorageFactory
resourceSynchroFactory resourcesynchro.SynchroFactory
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory

closeOnce sync.Once
closer chan struct{}
Expand Down Expand Up @@ -123,6 +125,12 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat
storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),
}

if factory, ok := storage.(resourcesynchro.SynchroFactory); ok {
synchro.resourceSynchroFactory = factory
} else {
synchro.resourceSynchroFactory = DefaultResourceSynchroFactory{}
}

var refresherOnce sync.Once
synchro.dynamicDiscovery.Prepare(discovery.PrepareConfig{
ResourceMutationHandler: synchro.resetSyncResources,
Expand Down Expand Up @@ -167,8 +175,9 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat

func (s *ClusterSynchro) GetMetricsWriterList() (writers metricsstore.MetricsWriterList) {
s.storageResourceSynchros.Range(func(_, value interface{}) bool {
if synchro := value.(*ResourceSynchro); synchro.metricsWriter != nil {
writers = append(writers, synchro.metricsWriter)
synchro := value.(resourcesynchro.Synchro)
if writer := synchro.GetMetricsWriter(); writer != nil {
writers = append(writers, writer)
}
return true
})
Expand Down Expand Up @@ -237,15 +246,15 @@ func (s *ClusterSynchro) Shutdown(updateStatus bool) {
shutdownCount := 0
statuses := make(map[string][]string)
s.storageResourceSynchros.Range(func(key, value interface{}) bool {
synchro := value.(*ResourceSynchro)
synchro := value.(resourcesynchro.Synchro)
status := synchro.Status()
if status.Status == clusterv1alpha2.ResourceSyncStatusStop && status.Reason == "" {
shutdownCount++
return true
}

gvr := key.(schema.GroupVersionResource)
sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.runningStage)
sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.Stage())
msg := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
statuses[sr] = append(statuses[sr], msg)
return true
Expand Down Expand Up @@ -359,18 +368,23 @@ func (s *ClusterSynchro) refreshSyncResources() {
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
synchro, err := s.resourceSynchroFactory.NewResourceSynchro(s.name,
resourcesynchro.Config{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
ResourceStorage: resourceStorage,
},
)
if err != nil {
klog.ErrorS(err, "Failed to create resource synchro", "cluster", s.name, "storage resource", storageGVR)
updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource synchro failed: %s", err))
continue
}
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)

Expand Down Expand Up @@ -400,7 +414,7 @@ func (s *ClusterSynchro) refreshSyncResources() {
for storageGVR := range removedStorageGVRs {
if synchro, ok := s.storageResourceSynchros.Load(storageGVR); ok {
select {
case <-synchro.(*ResourceSynchro).Close():
case <-synchro.(resourcesynchro.Synchro).Close():
case <-s.closer:
return
}
Expand Down Expand Up @@ -475,7 +489,7 @@ func (s *ClusterSynchro) runner() {
go s.dynamicDiscovery.Start(s.handlerStopCh)

s.storageResourceSynchros.Range(func(_, value interface{}) bool {
go value.(*ResourceSynchro).Start(s.handlerStopCh)
go value.(resourcesynchro.Synchro).Start(s.handlerStopCh)
return true
})
}()
Expand Down Expand Up @@ -542,12 +556,13 @@ func (s *ClusterSynchro) genClusterStatus() *clusterv1alpha2.ClusterStatus {
gr := schema.GroupResource{Group: status.Group, Resource: resource.Name}
storageGVR := cond.StorageGVR(gr)
if value, ok := s.storageResourceSynchros.Load(storageGVR); ok {
synchro := value.(*ResourceSynchro)
if gr != synchro.syncResource.GroupResource() {
cond.SyncResource = synchro.syncResource.GroupResource().String()
synchro := value.(resourcesynchro.Synchro)
syncedGVR := synchro.GroupVersionResource()
if gr != syncedGVR.GroupResource() {
cond.SyncResource = syncedGVR.GroupResource().String()
}
if cond.Version != synchro.syncResource.Version {
cond.SyncVersion = synchro.syncResource.Version
if cond.Version != syncedGVR.Version {
cond.SyncVersion = syncedGVR.Version
}

status := synchro.Status()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,16 @@ import (
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/queue"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/queue"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ResourceSynchroConfig struct {
schema.GroupVersionResource
Kind string

cache.ListerWatcher
runtime.ObjectConvertor
storage.ResourceStorage

*kubestatemetrics.MetricsStore

ResourceVersions map[string]interface{}
PageSizeForInformer int64
}

func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind {
return c.GroupVersionResource.GroupVersion().WithKind(c.Kind)
}

type ResourceSynchro struct {
type resourceSynchro struct {
cluster string

example runtime.Object
Expand Down Expand Up @@ -88,9 +70,11 @@ type ResourceSynchro struct {
runningStage string
}

func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro {
type DefaultResourceSynchroFactory struct{}

func (factory DefaultResourceSynchroFactory) NewResourceSynchro(cluster string, config resourcesynchro.Config) (resourcesynchro.Synchro, error) {
storageConfig := config.ResourceStorage.GetStorageConfig()
synchro := &ResourceSynchro{
synchro := &resourceSynchro{
cluster: cluster,
syncResource: config.GroupVersionResource,
storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version),
Expand Down Expand Up @@ -127,10 +111,26 @@ func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceS
}

synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "")
return synchro
return synchro, nil
}

func (synchro *resourceSynchro) Stage() string {
return synchro.runningStage
}

func (synchro *resourceSynchro) GroupVersionResource() schema.GroupVersionResource {
return synchro.syncResource
}

func (synchro *resourceSynchro) StoragedGroupVersionResource() schema.GroupVersionResource {
return synchro.storageResource
}

func (synchro *resourceSynchro) GetMetricsWriter() *metricsstore.MetricsWriter {
return synchro.metricsWriter
}

func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) {
func (synchro *resourceSynchro) Run(shutdown <-chan struct{}) {
defer close(synchro.closed)
go func() {
select {
Expand Down Expand Up @@ -158,7 +158,7 @@ func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) {
synchro.runningStage = "shutdown"
}

func (synchro *ResourceSynchro) Close() <-chan struct{} {
func (synchro *resourceSynchro) Close() <-chan struct{} {
synchro.closeOnce.Do(func() {
close(synchro.closer)
synchro.queue.Close()
Expand All @@ -167,7 +167,7 @@ func (synchro *ResourceSynchro) Close() <-chan struct{} {
return synchro.closed
}

func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {
func (synchro *resourceSynchro) Start(stopCh <-chan struct{}) {
synchro.startlock.Lock()
stopped := synchro.stopped // avoid race
synchro.startlock.Unlock()
Expand Down Expand Up @@ -288,7 +288,7 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {

const LastAppliedConfigurationAnnotation = "kubectl.kubernetes.io/last-applied-configuration"

func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) {
func (synchro *resourceSynchro) pruneObject(obj *unstructured.Unstructured) {
if clusterpediafeature.FeatureGate.Enabled(features.PruneManagedFields) {
obj.SetManagedFields(nil)
}
Expand All @@ -305,7 +305,7 @@ func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) {
}
}

func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
func (synchro *resourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -327,7 +327,7 @@ func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
_ = synchro.queue.Add(obj)
}

func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
func (synchro *resourceSynchro) OnUpdate(_, obj interface{}) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -342,7 +342,7 @@ func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
_ = synchro.queue.Update(obj)
}

func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
func (synchro *resourceSynchro) OnDelete(obj interface{}) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -357,9 +357,9 @@ func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
_ = synchro.queue.Delete(obj)
}

func (synchro *ResourceSynchro) OnSync(obj interface{}) {}
func (synchro *resourceSynchro) OnSync(obj interface{}) {}

func (synchro *ResourceSynchro) processResources() {
func (synchro *resourceSynchro) processResources() {
for {
select {
case <-synchro.closer:
Expand All @@ -381,7 +381,7 @@ func (synchro *ResourceSynchro) processResources() {
}
}

func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
func (synchro *resourceSynchro) handleResourceEvent(event *queue.Event) {
defer func() { _ = synchro.queue.Done(event) }()

obj, ok := event.Object.(runtime.Object)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
}
}

func (synchro *ResourceSynchro) setRunnableForStorage() {
func (synchro *resourceSynchro) setRunnableForStorage() {
synchro.isRunnableForStorage.Store(true)

synchro.forStorageLock.Lock()
Expand All @@ -494,7 +494,7 @@ func (synchro *ResourceSynchro) setRunnableForStorage() {
}
}

func (synchro *ResourceSynchro) setStopForStorage() {
func (synchro *resourceSynchro) setStopForStorage() {
synchro.isRunnableForStorage.Store(false)

synchro.forStorageLock.Lock()
Expand All @@ -512,7 +512,7 @@ func (synchro *ResourceSynchro) setStopForStorage() {
}
}

func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) {
func (synchro *resourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) {
if synchro.syncResource == synchro.storageResource || synchro.convertor == nil {
return obj, nil
}
Expand All @@ -535,27 +535,27 @@ func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (run
return obj, nil
}

func (synchro *ResourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error {
err := synchro.storage.Create(ctx, synchro.cluster, obj)
if genericstorage.IsExist(err) {
return synchro.storage.Update(ctx, synchro.cluster, obj)
}
return err
}

func (synchro *ResourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error {
err := synchro.storage.Update(ctx, synchro.cluster, obj)
if genericstorage.IsNotFound(err) {
return synchro.storage.Create(ctx, synchro.cluster, obj)
}
return err
}

func (synchro *ResourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error {
return synchro.storage.Delete(ctx, synchro.cluster, obj)
}

func (synchro *ResourceSynchro) setStatus(status string, reason, message string) {
func (synchro *resourceSynchro) setStatus(status string, reason, message string) {
synchro.status.Store(clusterv1alpha2.ClusterResourceSyncCondition{
Status: status,
Reason: reason,
Expand All @@ -564,11 +564,11 @@ func (synchro *ResourceSynchro) setStatus(status string, reason, message string)
})
}

func (synchro *ResourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition {
func (synchro *resourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition {
return synchro.status.Load().(clusterv1alpha2.ClusterResourceSyncCondition)
}

func (synchro *ResourceSynchro) ErrorHandler(r *informer.Reflector, err error) {
func (synchro *resourceSynchro) ErrorHandler(r *informer.Reflector, err error) {
if err != nil {
// TODO(iceber): Use `k8s.io/apimachinery/pkg/api/errors` to resolve the error type and update it to `status.Reason`
synchro.setStatus(clusterv1alpha2.ResourceSyncStatusError, "ResourceWatchFailed", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/utils/clock"
"k8s.io/utils/trace"

clspager "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer/pager"
clspager "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/informer/pager"
)

const defaultExpectedTypeName = "<unspecified>"
Expand Down
Loading

0 comments on commit e402630

Please sign in to comment.