diff --git a/e2e/testcases/otel_collector_test.go b/e2e/testcases/otel_collector_test.go index 913f5ead13..47d91e97de 100644 --- a/e2e/testcases/otel_collector_test.go +++ b/e2e/testcases/otel_collector_test.go @@ -17,6 +17,7 @@ package e2e import ( "context" "fmt" + "os/exec" "strings" "testing" "time" @@ -24,18 +25,23 @@ import ( monitoringv2 "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/golang/protobuf/ptypes/timestamp" + "github.com/pkg/errors" "google.golang.org/api/iterator" + "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/genproto/googleapis/api/monitoredres" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "kpt.dev/configsync/e2e" "kpt.dev/configsync/e2e/nomostest" "kpt.dev/configsync/e2e/nomostest/ntopts" "kpt.dev/configsync/e2e/nomostest/retry" nomostesting "kpt.dev/configsync/e2e/nomostest/testing" - "kpt.dev/configsync/e2e/nomostest/testkubeclient" "kpt.dev/configsync/pkg/api/configsync" + "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/kinds" + "kpt.dev/configsync/pkg/metrics" ocmetrics "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/testing/fake" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -45,8 +51,24 @@ const ( GCMMetricPrefix = "custom.googleapis.com/opencensus/config_sync" ) -var GCMMetricTypes = []string{ocmetrics.ReconcilerErrors.Name(), ocmetrics.PipelineError.Name(), ocmetrics.ReconcileDuration.Name(), ocmetrics.ParserDuration.Name(), ocmetrics.InternalErrors.Name()} +var GCMMetricTypes = []string{ + ocmetrics.ReconcilerErrors.Name(), + ocmetrics.PipelineError.Name(), + ocmetrics.ReconcileDuration.Name(), + ocmetrics.ParserDuration.Name(), + ocmetrics.InternalErrors.Name(), +} +// TestOtelCollectorDeployment validates that metrics reporting works for +// Google Cloud Monitoring using either workload identity or node identity. +// +// Requirements: +// - node identity: +// - node GSA with roles/monitoring.metricWriter IAM +// +// - workload identity: +// - e2e-test-metric-writer GSA with roles/monitoring.metricWriter IAM +// - roles/iam.workloadIdentityUser on config-management-monitoring/default for e2e-test-metric-writer func TestOtelCollectorDeployment(t *testing.T) { nt := nomostest.New(t, nomostesting.Reconciliation1, ntopts.RequireGKE(t)) nt.T.Cleanup(func() { @@ -54,7 +76,7 @@ func TestOtelCollectorDeployment(t *testing.T) { nt.PodLogs("config-management-monitoring", ocmetrics.OtelCollectorName, "", false) } }) - + setupMetricsServiceAccount(nt) nt.T.Cleanup(func() { nt.MustKubectl("delete", "-f", "../testdata/otel-collector/otel-cm-monarch-rejected-labels.yaml", "--ignore-not-found") nt.T.Log("Restart otel-collector pod to reset the ConfigMap and log") @@ -62,48 +84,30 @@ func TestOtelCollectorDeployment(t *testing.T) { if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil { nt.T.Errorf("otel-collector pod failed to come up after a restart: %v", err) } - ksa := fake.ServiceAccountObject(DefaultMonitorKSA) - if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil { - nt.T.Errorf("service account not found during cleanup: %v", err) - } - _, err := unsetAnnotation(nt.KubeClient, "iam.gke.io/gcp-service-account", ksa) - if err != nil { - nt.T.Errorf("failed to deannotate service account during cleanup: %v", err) - } }) - // If Workload Identity enabled on cluster, setup KSA to GSA annotation - if workloadPool, err := getWorkloadPool(nt); err != nil { - nt.T.Fatal(err) - } else if workloadPool != "" { - nt.T.Log(fmt.Sprintf("Workload identity enabled, adding KSA annotation to use %s service account", MonitorGSA)) - ksa := fake.ServiceAccountObject(DefaultMonitorKSA) - if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil { - nt.T.Fatal(fmt.Sprintf("service account does not exist: %v", err)) - } - if _, err := setAnnotation(nt.KubeClient, "iam.gke.io/gcp-service-account", - fmt.Sprintf("%s@%s.iam.gserviceaccount.com", MonitorGSA, *e2e.GCPProject), ksa); err != nil { - nt.T.Fatal(err) - } - } - nt.T.Log("Restart otel-collector pod to refresh the ConfigMap, log and IAM") nomostest.DeletePodByLabel(nt, "app", ocmetrics.OpenTelemetry, false) if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil { nt.T.Fatal(err) } + startTime := time.Now().UTC() + nt.T.Log("Adding test commit after otel-collector is started up so multiple commit hashes are processed in pipelines") namespace := fake.NamespaceObject("foo") nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace)) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } nt.T.Log("Watch for metrics in GCM, timeout 2 minutes") - ctx := context.Background() + ctx := nt.Context client, err := createGCMClient(ctx) if err != nil { nt.T.Fatal(err) } - startTime := time.Now().UTC() // retry for 2 minutes until metric is accessible from GCM _, err = retry.Retry(120*time.Second, func() error { for _, metricType := range GCMMetricTypes { @@ -144,6 +148,121 @@ func TestOtelCollectorDeployment(t *testing.T) { } } +// TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring +// metrics to ensure that the "commit" label is removed through aggregation in +// the otel-collector config. +// +// Requirements: +// - node identity: +// - node GSA with roles/monitoring.metricWriter IAM +// +// - workload identity: +// - e2e-test-metric-writer GSA with roles/monitoring.metricWriter IAM +// - roles/iam.workloadIdentityUser on config-management-monitoring/default for e2e-test-metric-writer +func TestOtelCollectorGCMLabelAggregation(t *testing.T) { + nt := nomostest.New(t, nomostesting.Reconciliation1, ntopts.RequireGKE(t)) + setupMetricsServiceAccount(nt) + + nt.T.Log("Restarting the otel-collector pod to refresh the service account") + nomostest.DeletePodByLabel(nt, "app", ocmetrics.OpenTelemetry, false) + if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil { + nt.T.Fatal(err) + } + + startTime := time.Now().UTC() + + nt.T.Log("Adding test commit after otel-collector restart") + namespace := fake.NamespaceObject("foo") + nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace)) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + + // The following metrics are sent to GCM and aggregated to remove the "commit" label. + var metricsWithCommitLabel = []string{ + ocmetrics.LastSync.Name(), + ocmetrics.DeclaredResources.Name(), + ocmetrics.ApplyDuration.Name(), + // LastApply also has commit but is filtered by filter/cloudmonitoring. + } + + nt.T.Log("Watch for metrics in GCM, timeout 2 minutes") + ctx := nt.Context + client, err := createGCMClient(nt.Context) + if err != nil { + nt.T.Fatal(err) + } + // retry for 2 minutes until metric is accessible from GCM + _, err = retry.Retry(120*time.Second, func() error { + for _, metricType := range metricsWithCommitLabel { + descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType) + it := listMetricInGCM(ctx, nt, client, startTime, descriptor) + return validateMetricInGCM(nt, it, descriptor, nt.ClusterName, + metricDoesNotHaveLabel(metrics.KeyCommit.Name())) + } + return nil + }) + if err != nil { + nt.T.Fatal(err) + } +} + +func setupMetricsServiceAccount(nt *nomostest.NT) { + workloadPool, err := getWorkloadPool(nt) + if err != nil { + nt.T.Fatal(err) + } + // If Workload Identity enabled on cluster, setup KSA to GSA annotation. + // Otherwise, the node identity is used. + if workloadPool != "" { + gsaEmail := fmt.Sprintf("%s@%s.iam.gserviceaccount.com", MonitorGSA, *e2e.GCPProject) + // Validate that the GCP Service Account exists + _, err := describeGCPServiceAccount(nt, gsaEmail, *e2e.GCPProject) + if err != nil { + nt.T.Fatalf("failed to get service account for workload identity: %v", err) + } + + nt.T.Cleanup(func() { + ksa := &corev1.ServiceAccount{} + if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil { + if apierrors.IsNotFound(err) { + return // no need to remove annotation + } + nt.T.Fatalf("failed to get service account during cleanup: %v", err) + } + core.RemoveAnnotations(ksa, "iam.gke.io/gcp-service-account") + if err := nt.KubeClient.Update(ksa); err != nil { + nt.T.Fatalf("failed to remove service account annotation during cleanup: %v", err) + } + }) + + nt.T.Log(fmt.Sprintf("Workload identity enabled, adding KSA annotation to use %s service account", MonitorGSA)) + ksa := &corev1.ServiceAccount{} + if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil { + nt.T.Fatalf("failed to get service account: %v", err) + } + core.SetAnnotation(ksa, "iam.gke.io/gcp-service-account", gsaEmail) + if err := nt.KubeClient.Update(ksa); err != nil { + nt.T.Fatalf("failed to set service account annotation: %v", err) + } + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + } +} + +func describeGCPServiceAccount(nt *nomostest.NT, gsaEmail, projectID string) ([]byte, error) { + args := []string{"iam", "service-accounts", "describe", gsaEmail, "--project", projectID} + nt.T.Logf("gcloud %s", strings.Join(args, " ")) + cmd := exec.Command("gcloud", args...) + out, err := cmd.CombinedOutput() + if err != nil { + return nil, errors.Errorf("failed to describe GCP service account: %s: %v\nstdout/stderr:\n%s", gsaEmail, err, string(out)) + } + return out, nil +} + func validateDeploymentLogHasFailure(nt *nomostest.NT, deployment, namespace, errorString string) error { nt.T.Helper() @@ -215,9 +334,21 @@ func listMetricInGCM(ctx context.Context, nt *nomostest.NT, client *monitoringv2 return client.ListTimeSeries(ctx, req) } +type metricValidatorFunc func(*metric.Metric, *monitoredres.MonitoredResource) error + +func metricDoesNotHaveLabel(label string) metricValidatorFunc { + return func(m *metric.Metric, r *monitoredres.MonitoredResource) error { + labels := r.GetLabels() + if value, found := labels[label]; found { + return errors.Errorf("expected metric to not have label, but found %s=%s", label, value) + } + return nil + } +} + // Validates a metricType from a specific cluster_name can be found within given // TimeSeries -func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string) error { +func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error { for { resp, err := it.Next() if err == iterator.Done { @@ -226,42 +357,24 @@ func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, if err != nil { return err } - if resp.GetMetric().GetType() == metricType && - resp.GetResource().GetLabels()["cluster_name"] == clusterName { - return nil + metric := resp.GetMetric() + resource := resp.GetResource() + nt.Logger.Debugf(`GCM metric result: { "type": %q, "labels": %+v, "resource.type": %q, "resource.labels": %+v }`, + metric.Type, metric.Labels, resource.Type, resource.Labels) + if metric.GetType() == metricType { + labels := resource.GetLabels() + if labels["cluster_name"] == clusterName { + for _, valFn := range valFns { + if err := valFn(metric, resource); err != nil { + return errors.Wrapf(err, + "GCM metric %s failed validation (cluster_name=%s)", + metricType, nt.ClusterName) + } + } + return nil + } } } - return fmt.Errorf("did not find target time series in GCM: type %s, cluster name %s", metricType, nt.ClusterName) -} - -func setAnnotation(client *testkubeclient.KubeClient, annotationKey, annotationValue string, obj client.Object) (bool, error) { - annotations := obj.GetAnnotations() - if annotations == nil { - annotations = map[string]string{} - } - if annotations[annotationKey] == annotationValue { - return false, nil - } - annotations[annotationKey] = annotationValue - obj.SetAnnotations(annotations) - if err := client.Update(obj); err != nil { - return false, fmt.Errorf("failed to apply updated object to cluster: %v", err) - } - return true, nil -} - -func unsetAnnotation(client *testkubeclient.KubeClient, annotationKey string, obj client.Object) (bool, error) { - annotations := obj.GetAnnotations() - if annotations == nil { - return false, nil - } - if _, exists := annotations[annotationKey]; !exists { - return false, nil - } - delete(annotations, annotationKey) - obj.SetAnnotations(annotations) - if err := client.Update(obj); err != nil { - return false, fmt.Errorf("failed to apply updated object to cluster: %v", err) - } - return true, nil + return fmt.Errorf("GCM metric %s not found (cluster_name=%s)", + metricType, nt.ClusterName) } diff --git a/go.mod b/go.mod index 90f4407b0d..db2a681eb7 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( golang.org/x/net v0.8.0 golang.org/x/oauth2 v0.6.0 google.golang.org/api v0.108.0 + google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.26.7 k8s.io/apiextensions-apiserver v0.26.7 @@ -136,7 +137,6 @@ require ( golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2 // indirect google.golang.org/grpc v1.51.0 // indirect google.golang.org/protobuf v1.29.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/pkg/metrics/otel.go b/pkg/metrics/otel.go index bb2e93011f..c28838a907 100644 --- a/pkg/metrics/otel.go +++ b/pkg/metrics/otel.go @@ -121,6 +121,39 @@ processors: - cluster_scoped_resource_count - resource_ns_count - api_duration_seconds + # Aggregate some metrics sent to Cloud Monitoring to remove high-cardinality labels (e.g. "commit") + metricstransform/cloudmonitoring: + transforms: + - include: last_sync_timestamp + action: update + operations: + - action: aggregate_labels + label_set: + - configsync.sync.kind + - configsync.sync.name + - configsync.sync.namespace + - status + aggregation_type: max + - include: declared_resources + action: update + new_name: current_declared_resources + operations: + - action: aggregate_labels + label_set: + - configsync.sync.kind + - configsync.sync.name + - configsync.sync.namespace + aggregation_type: max + - include: apply_duration_seconds + action: update + operations: + - action: aggregate_labels + label_set: + - configsync.sync.kind + - configsync.sync.name + - configsync.sync.namespace + - status + aggregation_type: max filter/kubernetes: metrics: include: @@ -250,7 +283,7 @@ service: pipelines: metrics/cloudmonitoring: receivers: [opencensus] - processors: [batch, filter/cloudmonitoring, resourcedetection] + processors: [batch, filter/cloudmonitoring, metricstransform/cloudmonitoring, resourcedetection] exporters: [googlecloud] metrics/prometheus: receivers: [opencensus] diff --git a/pkg/reconcilermanager/controllers/otel_controller_test.go b/pkg/reconcilermanager/controllers/otel_controller_test.go index 2973bbe6d4..26d411211e 100644 --- a/pkg/reconcilermanager/controllers/otel_controller_test.go +++ b/pkg/reconcilermanager/controllers/otel_controller_test.go @@ -46,7 +46,7 @@ const ( // otel-collector ConfigMap. // See `CollectorConfigGooglecloud` in `pkg/metrics/otel.go` // Used by TestOtelReconcilerGooglecloud. - depAnnotationGooglecloud = "4d0a24156f2d3d1bdc6a01ef41a3f625" + depAnnotationGooglecloud = "c3b8f7c8647aa20b219d141081ed7f6f" // depAnnotationGooglecloud is the expected hash of the custom // otel-collector ConfigMap test artifact. // Used by TestOtelReconcilerCustom.