Skip to content

Commit

Permalink
Merge pull request #174 from gianrubio/review-snapshots
Browse files Browse the repository at this point in the history
Review cronjob
  • Loading branch information
stevesloka authored Apr 27, 2018
2 parents 872dad5 + f365f37 commit dcf3eef
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 35 deletions.
1 change: 1 addition & 0 deletions example/example-es-cluster-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spec:
scheduler-enabled: false
bucket-name: elasticsnapshots99
cron-schedule: "@every 2m"
image: upmcenterprises/elasticsearch-cron:0.0.3
storage:
storage-class: default
resources:
Expand Down
1 change: 1 addition & 0 deletions example/example-es-cluster-hostpath.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ spec:
scheduler-enabled: false
bucket-name: elasticsnapshots99
cron-schedule: "@every 2m"
image: upmcenterprises/elasticsearch-cron:0.0.3
storage:
storage-class: hostpath-storage
1 change: 1 addition & 0 deletions example/example-es-cluster-minikube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spec:
scheduler-enabled: false
bucket-name: elasticsnapshots99
cron-schedule: "@every 2m"
image: upmcenterprises/elasticsearch-cron:0.0.3
storage:
type: standard
storage-class-version: volume.alpha.kubernetes.io/storage-class
Expand Down
1 change: 1 addition & 0 deletions example/example-es-cluster-rook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ spec:
scheduler-enabled: false
bucket-name: elasticsnapshots99
cron-schedule: "@every 2m"
image: upmcenterprises/elasticsearch-cron:0.0.3
storage:
storage-class: rook-block
resources:
Expand Down
1 change: 1 addition & 0 deletions example/example-es-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spec:
scheduler-enabled: false
bucket-name: elasticsnapshots99
cron-schedule: "@every 2m"
image: upmcenterprises/elasticsearch-cron:0.0.3
storage:
type: gp2
storage-class-provisioner: kubernetes.io/aws-ebs
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/elasticsearchoperator/v1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ type Snapshot struct {

// Authentication defines credentials for snapshot requests
Authentication Authentication `json:"authentication"`

// Defines the image to run cronjobs
Image string `json:"image"`
}

// Authentication defines credentials for snapshot requests
Expand Down Expand Up @@ -208,6 +211,7 @@ type Scheduler struct {
ElasticURL string
Namespace string
ClusterName string
Image string
}

// SchedulerAuthentication stores credentials used to authenticate against snapshot endpoint
Expand Down
17 changes: 9 additions & 8 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,15 @@ func (p *Processor) refreshClusters() error {
},
},
Scheduler: snapshot.New(
cluster.Spec.Scheduler.S3bucketName,
cluster.Spec.Scheduler.CronSchedule,
cluster.Spec.Scheduler.Enabled,
cluster.Spec.Scheduler.Auth.UserName,
cluster.Spec.Scheduler.Auth.Password,
cluster.Spec.Scheduler.ElasticURL,
cluster.Spec.Scheduler.ClusterName,
cluster.Spec.Scheduler.Namespace,
cluster.Spec.Snapshot.BucketName,
cluster.Spec.Snapshot.CronSchedule,
cluster.Spec.Snapshot.SchedulerEnabled,
cluster.Spec.Snapshot.Authentication.UserName,
cluster.Spec.Snapshot.Authentication.Password,
cluster.Spec.Snapshot.Image,
p.k8sclient.GetClientServiceNameFullDNS(cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace),
cluster.ObjectMeta.Name,
cluster.ObjectMeta.Namespace,
p.k8sclient.Kclient,
),
}
Expand Down
74 changes: 47 additions & 27 deletions pkg/snapshot/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package snapshot

import (
"fmt"
"net/http"

"k8s.io/client-go/kubernetes"

Expand All @@ -34,16 +35,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

enterprisesv1 "github.com/upmc-enterprises/elasticsearch-operator/pkg/apis/elasticsearchoperator/v1"

batchv1 "k8s.io/api/batch/v1"
v2alpha1 "k8s.io/api/batch/v2alpha1"
v1beta1 "k8s.io/api/batch/v1beta1"
apicore "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
baseCronImage = "upmcenterprises/elasticsearch-cron:0.0.1"
CRON_ACTION_REPOSITORY = "create-repository"
CRON_ACTION_SNAPSHOT = "snapshot"
defaultCronImage = "upmcenterprises/elasticsearch-cron:0.0.3"
cronActionRepository = "create-repository"
cronActionSnapshot = "snapshot"
)

type Scheduler struct {
Expand All @@ -52,7 +53,12 @@ type Scheduler struct {
}

// New creates an instance of Scheduler
func New(bucketName, cronSchedule string, enabled bool, userName, password, elasticURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler {
func New(bucketName, cronSchedule string, enabled bool, userName, password, image,
elasticURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler {

if image == "" {
image = defaultCronImage
}

return &Scheduler{
Kclient: kc,
Expand All @@ -67,6 +73,7 @@ func New(bucketName, cronSchedule string, enabled bool, userName, password, elas
Namespace: namespace,
ClusterName: clusterName,
Enabled: enabled,
Image: image,
},
}
}
Expand All @@ -91,18 +98,12 @@ func (s *Scheduler) Init() error {
// CreateSnapshotRepository creates the snapshot repository cronjob
func (s *Scheduler) CreateSnapshotRepository() error {
// TODO: This should wait until the api goes green and cluster is healthy
if err := s.CreateCronJob(s.CRD.Namespace, s.CRD.ClusterName, CRON_ACTION_REPOSITORY, s.CRD.CronSchedule); err != nil {
return err
}
return nil
return s.CreateCronJob(s.CRD.Namespace, s.CRD.ClusterName, cronActionRepository, s.CRD.CronSchedule)
}

// CreateSnapshot creates snapshot cronjob
func (s *Scheduler) CreateSnapshot() error {
if err := s.CreateCronJob(s.CRD.Namespace, s.CRD.ClusterName, CRON_ACTION_SNAPSHOT, s.CRD.CronSchedule); err != nil {
return err
}
return nil
return s.CreateCronJob(s.CRD.Namespace, s.CRD.ClusterName, cronActionSnapshot, s.CRD.CronSchedule)
}

// Stop cleans up Cron
Expand All @@ -119,34 +120,51 @@ func (s *Scheduler) deleteJobs(namespace, clusterName string) {
LabelSelector: fmt.Sprintf("app=elasticsearch-operator,clusterName=%s", clusterName),
})

// ignore not found error
if err != nil {
logrus.Error("Could not delete Jobs! ", err)
if err.(*apierrors.StatusError).ErrStatus.Code != http.StatusNotFound {
logrus.Error("Could not delete Jobs! ", err)
}
}

}

// DeleteCronJob deletes a cron job
func (s *Scheduler) deleteCronJob(namespace, clusterName string) {
// Repository CronJob
snapshotName := getSnapshotname(clusterName, CRON_ACTION_REPOSITORY)
err := s.Kclient.BatchV2alpha1().CronJobs(namespace).Delete(snapshotName, &metav1.DeleteOptions{})
snapshotName := getSnapshotname(clusterName, cronActionRepository)
err := s.Kclient.BatchV1beta1().CronJobs(namespace).Delete(snapshotName, &metav1.DeleteOptions{})

// ignore not found error
if err != nil {
logrus.Error("Could not delete Repository CronJob! ", err)
if _, ok := err.(*apierrors.StatusError); ok {
if err.(*apierrors.StatusError).ErrStatus.Code != http.StatusNotFound {
logrus.Error("Could not delete Repository CronJob! ", err)
}
}
}

// Snapshot CronJob
snapshotName = getSnapshotname(clusterName, CRON_ACTION_SNAPSHOT)
err = s.Kclient.BatchV2alpha1().CronJobs(namespace).Delete(snapshotName, &metav1.DeleteOptions{})
snapshotName = getSnapshotname(clusterName, cronActionSnapshot)
err = s.Kclient.BatchV1beta1().CronJobs(namespace).Delete(snapshotName, &metav1.DeleteOptions{})

// ignore not found error
if err != nil {
logrus.Error("Could not delete CronJob! ", err)
if _, ok := err.(*apierrors.StatusError); ok {
if err.(*apierrors.StatusError).ErrStatus.Code != http.StatusNotFound {
logrus.Error("Could not delete CronJob! ", err)
}
}
}

}

// CreateCronJob creates a cron job
func (s *Scheduler) CreateCronJob(namespace, clusterName, action, cronSchedule string) error {
snapshotName := getSnapshotname(clusterName, action)

// Check if CronJob exists
cronJob, err := s.Kclient.BatchV2alpha1().CronJobs(namespace).Get(snapshotName, metav1.GetOptions{})
cronJob, err := s.Kclient.BatchV1beta1().CronJobs(namespace).Get(snapshotName, metav1.GetOptions{})

if len(cronJob.Name) == 0 {

Expand All @@ -159,7 +177,7 @@ func (s *Scheduler) CreateCronJob(namespace, clusterName, action, cronSchedule s
if err == nil {
return err
}
job := &v2alpha1.CronJob{
job := &v1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: snapshotName,
Labels: map[string]string{
Expand All @@ -168,9 +186,9 @@ func (s *Scheduler) CreateCronJob(namespace, clusterName, action, cronSchedule s
"name": snapshotName,
},
},
Spec: v2alpha1.CronJobSpec{
Spec: v1beta1.CronJobSpec{
Schedule: cronSchedule,
JobTemplate: v2alpha1.JobTemplateSpec{
JobTemplate: v1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: apicore.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -185,7 +203,7 @@ func (s *Scheduler) CreateCronJob(namespace, clusterName, action, cronSchedule s
Containers: []apicore.Container{
apicore.Container{
Name: snapshotName,
Image: baseCronImage,
Image: s.CRD.Image,
ImagePullPolicy: "Always",
Resources: apicore.ResourceRequirements{
Requests: apicore.ResourceList{
Expand All @@ -209,14 +227,16 @@ func (s *Scheduler) CreateCronJob(namespace, clusterName, action, cronSchedule s
},
}

if _, err := s.Kclient.BatchV2alpha1().CronJobs(namespace).Create(job); err != nil {
if _, err := s.Kclient.BatchV1beta1().CronJobs(namespace).Create(job); err != nil {
logrus.Error("Could not create CronJob! ", err)
return err
}
} else if err != nil {
logrus.Error("Could not get cron job! ", err)
return err
}
logrus.Infof("CronJob %v succesfully created ! ", snapshotName)

return nil
}

Expand Down

0 comments on commit dcf3eef

Please sign in to comment.