Skip to content

Commit

Permalink
Aggregate commit label for GCM metrics (#825)
Browse files Browse the repository at this point in the history
This should reduce the number of unique metrics recorded and the
cost of using Config Sync with Google Cloud Monitoring.
Prometheus metrics still include the commit, to facilitate metrics
validation.
  • Loading branch information
karlkfi authored and google-oss-prow[bot] committed Aug 30, 2023
1 parent 47d67a6 commit 8ab20cc
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 69 deletions.
245 changes: 179 additions & 66 deletions e2e/testcases/otel_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,31 @@ package e2e
import (
"context"
"fmt"
"os/exec"
"strings"
"testing"
"time"

monitoringv2 "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/api/metric"
"google.golang.org/genproto/googleapis/api/monitoredres"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"kpt.dev/configsync/e2e"
"kpt.dev/configsync/e2e/nomostest"
"kpt.dev/configsync/e2e/nomostest/ntopts"
"kpt.dev/configsync/e2e/nomostest/retry"
nomostesting "kpt.dev/configsync/e2e/nomostest/testing"
"kpt.dev/configsync/e2e/nomostest/testkubeclient"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/metrics"
ocmetrics "kpt.dev/configsync/pkg/metrics"
"kpt.dev/configsync/pkg/testing/fake"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -45,65 +51,63 @@ const (
GCMMetricPrefix = "custom.googleapis.com/opencensus/config_sync"
)

var GCMMetricTypes = []string{ocmetrics.ReconcilerErrors.Name(), ocmetrics.PipelineError.Name(), ocmetrics.ReconcileDuration.Name(), ocmetrics.ParserDuration.Name(), ocmetrics.InternalErrors.Name()}
var GCMMetricTypes = []string{
ocmetrics.ReconcilerErrors.Name(),
ocmetrics.PipelineError.Name(),
ocmetrics.ReconcileDuration.Name(),
ocmetrics.ParserDuration.Name(),
ocmetrics.InternalErrors.Name(),
}

// TestOtelCollectorDeployment validates that metrics reporting works for
// Google Cloud Monitoring using either workload identity or node identity.
//
// Requirements:
// - node identity:
// - node GSA with roles/monitoring.metricWriter IAM
//
// - workload identity:
// - e2e-test-metric-writer GSA with roles/monitoring.metricWriter IAM
// - roles/iam.workloadIdentityUser on config-management-monitoring/default for e2e-test-metric-writer
func TestOtelCollectorDeployment(t *testing.T) {
nt := nomostest.New(t, nomostesting.Reconciliation1, ntopts.RequireGKE(t))
nt.T.Cleanup(func() {
if t.Failed() {
nt.PodLogs("config-management-monitoring", ocmetrics.OtelCollectorName, "", false)
}
})

setupMetricsServiceAccount(nt)
nt.T.Cleanup(func() {
nt.MustKubectl("delete", "-f", "../testdata/otel-collector/otel-cm-monarch-rejected-labels.yaml", "--ignore-not-found")
nt.T.Log("Restart otel-collector pod to reset the ConfigMap and log")
nomostest.DeletePodByLabel(nt, "app", ocmetrics.OpenTelemetry, false)
if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil {
nt.T.Errorf("otel-collector pod failed to come up after a restart: %v", err)
}
ksa := fake.ServiceAccountObject(DefaultMonitorKSA)
if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil {
nt.T.Errorf("service account not found during cleanup: %v", err)
}
_, err := unsetAnnotation(nt.KubeClient, "iam.gke.io/gcp-service-account", ksa)
if err != nil {
nt.T.Errorf("failed to deannotate service account during cleanup: %v", err)
}
})

// If Workload Identity enabled on cluster, setup KSA to GSA annotation
if workloadPool, err := getWorkloadPool(nt); err != nil {
nt.T.Fatal(err)
} else if workloadPool != "" {
nt.T.Log(fmt.Sprintf("Workload identity enabled, adding KSA annotation to use %s service account", MonitorGSA))
ksa := fake.ServiceAccountObject(DefaultMonitorKSA)
if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil {
nt.T.Fatal(fmt.Sprintf("service account does not exist: %v", err))
}
if _, err := setAnnotation(nt.KubeClient, "iam.gke.io/gcp-service-account",
fmt.Sprintf("%s@%s.iam.gserviceaccount.com", MonitorGSA, *e2e.GCPProject), ksa); err != nil {
nt.T.Fatal(err)
}
}

nt.T.Log("Restart otel-collector pod to refresh the ConfigMap, log and IAM")
nomostest.DeletePodByLabel(nt, "app", ocmetrics.OpenTelemetry, false)
if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil {
nt.T.Fatal(err)
}

startTime := time.Now().UTC()

nt.T.Log("Adding test commit after otel-collector is started up so multiple commit hashes are processed in pipelines")
namespace := fake.NamespaceObject("foo")
nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Watch for metrics in GCM, timeout 2 minutes")
ctx := context.Background()
ctx := nt.Context
client, err := createGCMClient(ctx)
if err != nil {
nt.T.Fatal(err)
}
startTime := time.Now().UTC()
// retry for 2 minutes until metric is accessible from GCM
_, err = retry.Retry(120*time.Second, func() error {
for _, metricType := range GCMMetricTypes {
Expand Down Expand Up @@ -144,6 +148,121 @@ func TestOtelCollectorDeployment(t *testing.T) {
}
}

// TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring
// metrics to ensure that the "commit" label is removed through aggregation in
// the otel-collector config.
//
// Requirements:
// - node identity:
// - node GSA with roles/monitoring.metricWriter IAM
//
// - workload identity:
// - e2e-test-metric-writer GSA with roles/monitoring.metricWriter IAM
// - roles/iam.workloadIdentityUser on config-management-monitoring/default for e2e-test-metric-writer
func TestOtelCollectorGCMLabelAggregation(t *testing.T) {
nt := nomostest.New(t, nomostesting.Reconciliation1, ntopts.RequireGKE(t))
setupMetricsServiceAccount(nt)

nt.T.Log("Restarting the otel-collector pod to refresh the service account")
nomostest.DeletePodByLabel(nt, "app", ocmetrics.OpenTelemetry, false)
if err := nt.Watcher.WatchForCurrentStatus(kinds.Deployment(), ocmetrics.OtelCollectorName, ocmetrics.MonitoringNamespace); err != nil {
nt.T.Fatal(err)
}

startTime := time.Now().UTC()

nt.T.Log("Adding test commit after otel-collector restart")
namespace := fake.NamespaceObject("foo")
nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

// The following metrics are sent to GCM and aggregated to remove the "commit" label.
var metricsWithCommitLabel = []string{
ocmetrics.LastSync.Name(),
ocmetrics.DeclaredResources.Name(),
ocmetrics.ApplyDuration.Name(),
// LastApply also has commit but is filtered by filter/cloudmonitoring.
}

nt.T.Log("Watch for metrics in GCM, timeout 2 minutes")
ctx := nt.Context
client, err := createGCMClient(nt.Context)
if err != nil {
nt.T.Fatal(err)
}
// retry for 2 minutes until metric is accessible from GCM
_, err = retry.Retry(120*time.Second, func() error {
for _, metricType := range metricsWithCommitLabel {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
it := listMetricInGCM(ctx, nt, client, startTime, descriptor)
return validateMetricInGCM(nt, it, descriptor, nt.ClusterName,
metricDoesNotHaveLabel(metrics.KeyCommit.Name()))
}
return nil
})
if err != nil {
nt.T.Fatal(err)
}
}

func setupMetricsServiceAccount(nt *nomostest.NT) {
workloadPool, err := getWorkloadPool(nt)
if err != nil {
nt.T.Fatal(err)
}
// If Workload Identity enabled on cluster, setup KSA to GSA annotation.
// Otherwise, the node identity is used.
if workloadPool != "" {
gsaEmail := fmt.Sprintf("%s@%s.iam.gserviceaccount.com", MonitorGSA, *e2e.GCPProject)
// Validate that the GCP Service Account exists
_, err := describeGCPServiceAccount(nt, gsaEmail, *e2e.GCPProject)
if err != nil {
nt.T.Fatalf("failed to get service account for workload identity: %v", err)
}

nt.T.Cleanup(func() {
ksa := &corev1.ServiceAccount{}
if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil {
if apierrors.IsNotFound(err) {
return // no need to remove annotation
}
nt.T.Fatalf("failed to get service account during cleanup: %v", err)
}
core.RemoveAnnotations(ksa, "iam.gke.io/gcp-service-account")
if err := nt.KubeClient.Update(ksa); err != nil {
nt.T.Fatalf("failed to remove service account annotation during cleanup: %v", err)
}
})

nt.T.Log(fmt.Sprintf("Workload identity enabled, adding KSA annotation to use %s service account", MonitorGSA))
ksa := &corev1.ServiceAccount{}
if err := nt.KubeClient.Get(DefaultMonitorKSA, ocmetrics.MonitoringNamespace, ksa); err != nil {
nt.T.Fatalf("failed to get service account: %v", err)
}
core.SetAnnotation(ksa, "iam.gke.io/gcp-service-account", gsaEmail)
if err := nt.KubeClient.Update(ksa); err != nil {
nt.T.Fatalf("failed to set service account annotation: %v", err)
}
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}
}
}

func describeGCPServiceAccount(nt *nomostest.NT, gsaEmail, projectID string) ([]byte, error) {
args := []string{"iam", "service-accounts", "describe", gsaEmail, "--project", projectID}
nt.T.Logf("gcloud %s", strings.Join(args, " "))
cmd := exec.Command("gcloud", args...)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Errorf("failed to describe GCP service account: %s: %v\nstdout/stderr:\n%s", gsaEmail, err, string(out))
}
return out, nil
}

func validateDeploymentLogHasFailure(nt *nomostest.NT, deployment, namespace, errorString string) error {
nt.T.Helper()

Expand Down Expand Up @@ -215,9 +334,21 @@ func listMetricInGCM(ctx context.Context, nt *nomostest.NT, client *monitoringv2
return client.ListTimeSeries(ctx, req)
}

type metricValidatorFunc func(*metric.Metric, *monitoredres.MonitoredResource) error

func metricDoesNotHaveLabel(label string) metricValidatorFunc {
return func(m *metric.Metric, r *monitoredres.MonitoredResource) error {
labels := r.GetLabels()
if value, found := labels[label]; found {
return errors.Errorf("expected metric to not have label, but found %s=%s", label, value)
}
return nil
}
}

// Validates a metricType from a specific cluster_name can be found within given
// TimeSeries
func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string) error {
func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error {
for {
resp, err := it.Next()
if err == iterator.Done {
Expand All @@ -226,42 +357,24 @@ func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator,
if err != nil {
return err
}
if resp.GetMetric().GetType() == metricType &&
resp.GetResource().GetLabels()["cluster_name"] == clusterName {
return nil
metric := resp.GetMetric()
resource := resp.GetResource()
nt.Logger.Debugf(`GCM metric result: { "type": %q, "labels": %+v, "resource.type": %q, "resource.labels": %+v }`,
metric.Type, metric.Labels, resource.Type, resource.Labels)
if metric.GetType() == metricType {
labels := resource.GetLabels()
if labels["cluster_name"] == clusterName {
for _, valFn := range valFns {
if err := valFn(metric, resource); err != nil {
return errors.Wrapf(err,
"GCM metric %s failed validation (cluster_name=%s)",
metricType, nt.ClusterName)
}
}
return nil
}
}
}
return fmt.Errorf("did not find target time series in GCM: type %s, cluster name %s", metricType, nt.ClusterName)
}

func setAnnotation(client *testkubeclient.KubeClient, annotationKey, annotationValue string, obj client.Object) (bool, error) {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
if annotations[annotationKey] == annotationValue {
return false, nil
}
annotations[annotationKey] = annotationValue
obj.SetAnnotations(annotations)
if err := client.Update(obj); err != nil {
return false, fmt.Errorf("failed to apply updated object to cluster: %v", err)
}
return true, nil
}

func unsetAnnotation(client *testkubeclient.KubeClient, annotationKey string, obj client.Object) (bool, error) {
annotations := obj.GetAnnotations()
if annotations == nil {
return false, nil
}
if _, exists := annotations[annotationKey]; !exists {
return false, nil
}
delete(annotations, annotationKey)
obj.SetAnnotations(annotations)
if err := client.Update(obj); err != nil {
return false, fmt.Errorf("failed to apply updated object to cluster: %v", err)
}
return true, nil
return fmt.Errorf("GCM metric %s not found (cluster_name=%s)",
metricType, nt.ClusterName)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
golang.org/x/net v0.8.0
golang.org/x/oauth2 v0.6.0
google.golang.org/api v0.108.0
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.26.7
k8s.io/apiextensions-apiserver v0.26.7
Expand Down Expand Up @@ -136,7 +137,6 @@ require (
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2 // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.29.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
35 changes: 34 additions & 1 deletion pkg/metrics/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,39 @@ processors:
- cluster_scoped_resource_count
- resource_ns_count
- api_duration_seconds
# Aggregate some metrics sent to Cloud Monitoring to remove high-cardinality labels (e.g. "commit")
metricstransform/cloudmonitoring:
transforms:
- include: last_sync_timestamp
action: update
operations:
- action: aggregate_labels
label_set:
- configsync.sync.kind
- configsync.sync.name
- configsync.sync.namespace
- status
aggregation_type: max
- include: declared_resources
action: update
new_name: current_declared_resources
operations:
- action: aggregate_labels
label_set:
- configsync.sync.kind
- configsync.sync.name
- configsync.sync.namespace
aggregation_type: max
- include: apply_duration_seconds
action: update
operations:
- action: aggregate_labels
label_set:
- configsync.sync.kind
- configsync.sync.name
- configsync.sync.namespace
- status
aggregation_type: max
filter/kubernetes:
metrics:
include:
Expand Down Expand Up @@ -250,7 +283,7 @@ service:
pipelines:
metrics/cloudmonitoring:
receivers: [opencensus]
processors: [batch, filter/cloudmonitoring, resourcedetection]
processors: [batch, filter/cloudmonitoring, metricstransform/cloudmonitoring, resourcedetection]
exporters: [googlecloud]
metrics/prometheus:
receivers: [opencensus]
Expand Down
Loading

0 comments on commit 8ab20cc

Please sign in to comment.