diff --git a/flake.lock b/flake.lock index 789724cc76..35448561b1 100644 --- a/flake.lock +++ b/flake.lock @@ -2,11 +2,11 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1725103162, - "narHash": "sha256-Ym04C5+qovuQDYL/rKWSR+WESseQBbNAe5DsXNx5trY=", + "lastModified": 1727348695, + "narHash": "sha256-J+PeFKSDV+pHL7ukkfpVzCOO7mBSrrpJ3svwBFABbhI=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "12228ff1752d7b7624a54e9c1af4b222b3c1073b", + "rev": "1925c603f17fc89f4c8f6bf6f631a802ad85d784", "type": "github" }, "original": { diff --git a/jobs/metricsforwarder/spec b/jobs/metricsforwarder/spec index d8feaf1be3..9413c8dc96 100644 --- a/jobs/metricsforwarder/spec +++ b/jobs/metricsforwarder/spec @@ -24,6 +24,10 @@ templates: policy_db.crt.erb: config/certs/policy_db/crt policy_db.key.erb: config/certs/policy_db/key + binding_db_ca.crt.erb: config/certs/binding_db/ca.crt + binding_db.crt.erb: config/certs/binding_db/crt + binding_db.key.erb: config/certs/binding_db/key + storedprocedure_db_ca.crt.erb: config/certs/storedprocedure_db/ca.crt storedprocedure_db.crt.erb: config/certs/storedprocedure_db/crt storedprocedure_db.key.erb: config/certs/storedprocedure_db/key @@ -126,6 +130,37 @@ properties: autoscaler.policy_db_connection_config.connection_max_lifetime: default: 60s + autoscaler.binding_db.address: + description: "IP address on which the bindingdb server will listen" + default: "autoscalerpostgres.service.cf.internal" + autoscaler.binding_db.databases: + description: "The list of databases used in bindingdb database including name" + autoscaler.binding_db.db_scheme: + description: "Database scheme to be used to access bindingdb" + default: postgres + autoscaler.binding_db.port: + description: "Port on which the bindingdb server will listen" + autoscaler.binding_db.roles: + description: "The list of database roles used in bindingdb database including name/password" + autoscaler.binding_db.tls.ca: + default: '' + description: 'PEM-encoded ca certificate for TLS database server' + autoscaler.binding_db.tls.certificate: + default: '' + description: 'PEM-encoded certificate for TLS database client' + autoscaler.binding_db.tls.private_key: + default: '' + description: 'PEM-encoded key for TLS database client' + autoscaler.binding_db.sslmode: + default: disable + description: "sslmode to connect to postgres server" + autoscaler.binding_db_connection_config.max_open_connections: + default: 20 + autoscaler.binding_db_connection_config.max_idle_connections: + default: 10 + autoscaler.binding_db_connection_config.connection_max_lifetime: + default: 60s + autoscaler.storedprocedure_db.address: description: "IP address on which the storedproceduredb server will listen" default: "" diff --git a/jobs/metricsforwarder/templates/binding_db.crt.erb b/jobs/metricsforwarder/templates/binding_db.crt.erb new file mode 100644 index 0000000000..f848f28d79 --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db.crt.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.certificate") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/binding_db.key.erb b/jobs/metricsforwarder/templates/binding_db.key.erb new file mode 100644 index 0000000000..5aee693cbc --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db.key.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.private_key") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/binding_db_ca.crt.erb b/jobs/metricsforwarder/templates/binding_db_ca.crt.erb new file mode 100644 index 0000000000..5935a893b1 --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db_ca.crt.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.ca") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/metricsforwarder.yml.erb b/jobs/metricsforwarder/templates/metricsforwarder.yml.erb index e518676955..7bb5df81dc 100644 --- a/jobs/metricsforwarder/templates/metricsforwarder.yml.erb +++ b/jobs/metricsforwarder/templates/metricsforwarder.yml.erb @@ -38,6 +38,7 @@ end ########################################### job_name = 'metricsforwarder' policy_db_url = build_db_url('policy_db', job_name) + binding_db_url = build_db_url('binding_db', job_name) if p("autoscaler.storedprocedure_db.address") != '' storedprocedure_db_url = build_db_url('storedprocedure_db', job_name) end @@ -80,6 +81,11 @@ db: max_open_connections: <%= p("autoscaler.policy_db_connection_config.max_open_connections") %> max_idle_connections: <%= p("autoscaler.policy_db_connection_config.max_idle_connections") %> connection_max_lifetime: <%= p("autoscaler.policy_db_connection_config.connection_max_lifetime") %> + binding_db: + url: <%= binding_db_url %> + max_open_connections: <%= p("autoscaler.binding_db_connection_config.max_open_connections") %> + max_idle_connections: <%= p("autoscaler.binding_db_connection_config.max_idle_connections") %> + connection_max_lifetime: <%= p("autoscaler.binding_db_connection_config.connection_max_lifetime") %> <% if p("autoscaler.storedprocedure_db.address") != '' %> storedprocedure_db: url: <%= storedprocedure_db_url %> diff --git a/scripts/generate_test_certs.sh b/scripts/generate_test_certs.sh index d306ac0d4d..d65a9e3d24 100755 --- a/scripts/generate_test_certs.sh +++ b/scripts/generate_test_certs.sh @@ -102,7 +102,7 @@ if [[ "$OPENSSL_VERSION" == LibreSSL* ]]; then echo "OpenSSL needs to be used rather than LibreSSL" exit 1 fi -# valid certificate +# valid client certificates echo "${depot_path}" openssl req -new -newkey rsa:2048 -nodes -subj "/CN=sap.com/O=SAP SE/OU=organization:AB1234ORG/OU=app:an-app-id/OU=space:AB1234SPACE" -out "${depot_path}"/validmtls_client-1.csr openssl x509 -req -in "${depot_path}"/validmtls_client-1.csr -CA "${depot_path}"/valid-mtls-local-ca-1.crt -CAkey "${depot_path}"/valid-mtls-local-ca-1.key -CAcreateserial -out "${depot_path}"/validmtls_client-1.crt -days 365 -sha256 diff --git a/spec/fixtures/metricsforwarder.yml b/spec/fixtures/metricsforwarder.yml index 434d1bbfcb..8d7528741a 100644 --- a/spec/fixtures/metricsforwarder.yml +++ b/spec/fixtures/metricsforwarder.yml @@ -14,6 +14,21 @@ autoscaler: ca: BEGIN---CA---END certificate: BEGIN---CERT---END private_key: BEGIN---KEY---END + binding_db: + address: 10.11.137.101 + databases: + - name: foo + tag: default + db_scheme: postgres + port: 5432 + roles: + - name: foo + password: default + tag: default + tls: + ca: BEGIN---CA---END + certificate: BEGIN---CERT---END + private_key: BEGIN---KEY---END cf: api: https://api.cf.domain auth_endpoint: https://login.cf.domain diff --git a/spec/jobs/metricsforwarder/metricsforwarder_spec.rb b/spec/jobs/metricsforwarder/metricsforwarder_spec.rb index 140ca46a92..1496fc7249 100644 --- a/spec/jobs/metricsforwarder/metricsforwarder_spec.rb +++ b/spec/jobs/metricsforwarder/metricsforwarder_spec.rb @@ -114,6 +114,20 @@ end end end + context "binding_db" do + it "includes the ca, cert and key in url when configured" do + rendered_template["db"]["binding_db"]["url"].tap do |url| + check_if_certs_in_url(url, "binding_db") + end + end + + it "does not include the ca, cert and key in url when not configured" do + properties["autoscaler"]["binding_db"]["tls"] = nil + rendered_template["db"]["binding_db"]["url"].tap do |url| + check_if_certs_not_in_url(url, "binding_db") + end + end + end end end end diff --git a/src/acceptance/app/app_suite_test.go b/src/acceptance/app/app_suite_test.go index 1f506b778d..fd832d3184 100644 --- a/src/acceptance/app/app_suite_test.go +++ b/src/acceptance/app/app_suite_test.go @@ -29,8 +29,12 @@ var ( instanceName string initialInstanceCount int - appName string - appGUID string + appToScaleName string + appToScaleGUID string + + metricProducerAppName string + + metricProducerAppGUID string ) const componentName = "Application Scale Suite" @@ -59,10 +63,15 @@ func AppAfterEach() { if os.Getenv("SKIP_TEARDOWN") == "true" { fmt.Println("Skipping Teardown...") } else { - DebugInfo(cfg, setup, appName) - if appName != "" { - DeleteService(cfg, instanceName, appName) - DeleteTestApp(appName, cfg.DefaultTimeoutDuration()) + DebugInfo(cfg, setup, appToScaleName) + if appToScaleName != "" { + DeleteService(cfg, instanceName, appToScaleName) + DeleteTestApp(appToScaleName, cfg.DefaultTimeoutDuration()) + } + if metricProducerAppName != "" { + DebugInfo(cfg, setup, metricProducerAppName) + DeleteService(cfg, instanceName, metricProducerAppName) + DeleteTestApp(metricProducerAppName, cfg.DefaultTimeoutDuration()) } } } diff --git a/src/acceptance/app/cf_metadata_test.go b/src/acceptance/app/cf_metadata_test.go index 4683f454df..96d4246f5e 100644 --- a/src/acceptance/app/cf_metadata_test.go +++ b/src/acceptance/app/cf_metadata_test.go @@ -15,19 +15,19 @@ var _ = Describe("AutoScaler CF metadata support", func() { ) BeforeEach(func() { policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "labeled-go_app", 1) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "labeled-go_app", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) AfterEach(AppAfterEach) When("the label app-autoscaler.cloudfoundry.org/disable-autoscaling is set", func() { It("should not scale out", func() { By("Set the label app-autoscaler.cloudfoundry.org/disable-autoscaling to true") - SetLabel(cfg, appGUID, "app-autoscaler.cloudfoundry.org/disable-autoscaling", "true") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, true) + SetLabel(cfg, appToScaleGUID, "app-autoscaler.cloudfoundry.org/disable-autoscaling", "true") + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, true) Consistently(scaleOut). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). diff --git a/src/acceptance/app/custom_metric_test.go b/src/acceptance/app/custom_metric_test.go index dd098a7dcb..b17d589a94 100644 --- a/src/acceptance/app/custom_metric_test.go +++ b/src/acceptance/app/custom_metric_test.go @@ -4,75 +4,140 @@ import ( "acceptance" "acceptance/config" . "acceptance/helpers" + "fmt" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) -var _ = Describe("AutoScaler custom metrics policy", func() { +var _ = Describe("AutoScaler custom metrics", func() { var ( policy string err error ) BeforeEach(func() { - policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "node-custom-metric", 1) - appGUID, err = GetAppGuid(cfg, appName) + + appToScaleName = CreateTestApp(cfg, "go-custom-metric", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + }) AfterEach(AppAfterEach) - // This test will fail if credential-type is set to X509 in autoscaler broker. - // Therefore, only mtls connection will be supported for custom metrics in future - Context("when scaling by custom metrics", func() { - It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { - By("Scale out to 2 instances") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, false) - Eventually(scaleOut). - WithTimeout(5 * time.Minute). - WithPolling(15 * time.Second). - Should(Equal(2)) - - By("Scale in to 1 instances") - scaleIn := sendMetricToAutoscaler(cfg, appGUID, appName, 100, false) - Eventually(scaleIn). - WithTimeout(5 * time.Minute). - WithPolling(15 * time.Second). - Should(Equal(1)) + Describe("custom metrics policy for same app", func() { + BeforeEach(func() { + policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + }) + // This test will fail if credential-type is set to X509 in autoscaler broker. + // Therefore, only mtls connection will be supported for custom metrics in future + Context("when scaling by custom metrics", func() { + BeforeEach(func() { + //instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + //StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + }) + It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { + By("Scale out to 2 instances") + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, false) + Eventually(scaleOut). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(2)) + + By("Scale in to 1 instances") + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 100, false) + Eventually(scaleIn). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(1)) + + }) + }) + + Context("when scaling by custom metrics via mtls", func() { + BeforeEach(func() { + //instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + //StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + }) + It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { + By("Scale out to 2 instances") + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, true) + Eventually(scaleOut). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(2)) + By("Scale in to 1 instance") + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 100, true) + Eventually(scaleIn). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(1)) + }) }) }) - Context("when scaling by custom metrics via mtls", func() { - It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { - By("Scale out to 2 instances") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, true) - Eventually(scaleOut). - WithTimeout(5 * time.Minute). - WithPolling(15 * time.Second). - Should(Equal(2)) - - By("Scale in to 1 instance") - scaleIn := sendMetricToAutoscaler(cfg, appGUID, appName, 100, true) - Eventually(scaleIn). - WithTimeout(5 * time.Minute). - WithPolling(15 * time.Second). - Should(Equal(1)) + Describe("Custom metrics with producer app", func() { + BeforeEach(func() { + // attach policy to appToScale B + policy = GenerateBindingsWithScalingPolicy("bound_app", 1, 2, "test_metric", 100, 500) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + + // push producer app without policy + metricProducerAppName = CreateTestApp(cfg, "go-custom_metric_producer-app", 1) + metricProducerAppGUID, err = GetAppGuid(cfg, metricProducerAppName) + Expect(err).NotTo(HaveOccurred()) + err := BindServiceToAppWithPolicy(cfg, metricProducerAppName, instanceName, "") + Expect(err).NotTo(HaveOccurred()) + StartApp(metricProducerAppName, cfg.CfPushTimeoutDuration()) + + }) + Context("producer app A sends custom metrics for appToScale B via mtls", func() { + When("policy is attached with the appToScale B with bound_app", func() { + BeforeEach(func() { + policy = GenerateBindingsWithScalingPolicy("bound_app", 1, 2, "test_metric", 100, 500) + }) + It("should scale out and scale in app B", Label(acceptance.LabelSmokeTests), func() { + By(fmt.Sprintf("Scale out %s to 2 instance", appToScaleName)) + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, metricProducerAppName, 550, true) + Eventually(scaleOut). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(2)) + + By(fmt.Sprintf("Scale in %s to 1 instance", appToScaleName)) + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, metricProducerAppName, 80, true) + Eventually(scaleIn). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(1)) + }) + }) + }) + Context("appToScale B tries to send metrics for producer app A with strategy same_app", func() { + BeforeEach(func() { + policy = GenerateBindingsWithScalingPolicy("same_app", 1, 2, "test_metric", 100, 500) + }) + It("should not scale producer app", func() { + By(fmt.Sprintf("Fail Scale %s ", metricProducerAppName)) + sendMetricToAutoscaler(cfg, metricProducerAppGUID, appToScaleName, 550, true) + WaitForNInstancesRunning(metricProducerAppGUID, 1, 5*time.Second, "expected 1 instance running") + }) }) }) }) -func sendMetricToAutoscaler(config *config.Config, appGUID string, appName string, metricThreshold int, mtls bool) func() (int, error) { +func sendMetricToAutoscaler(config *config.Config, appToScaleGUID string, metricProducerAppName string, metricThreshold int, mtls bool) func() (int, error) { return func() (int, error) { if mtls { - SendMetricMTLS(config, appName, metricThreshold) + SendMetricMTLS(config, appToScaleGUID, metricProducerAppName, metricThreshold) } else { - SendMetric(config, appName, metricThreshold) + SendMetric(config, metricProducerAppName, metricThreshold) } - return RunningInstances(appGUID, 5*time.Second) + return RunningInstances(appToScaleGUID, 5*time.Second) } } diff --git a/src/acceptance/app/dynamic_policy_test.go b/src/acceptance/app/dynamic_policy_test.go index a161c22e5c..79cfbd92e2 100644 --- a/src/acceptance/app/dynamic_policy_test.go +++ b/src/acceptance/app/dynamic_policy_test.go @@ -26,12 +26,12 @@ var _ = Describe("AutoScaler dynamic policy", func() { const minimalMemoryUsage = 17 // observed minimal memory usage by the test app JustBeforeEach(func() { - appName = CreateTestApp(cfg, "dynamic-policy", initialInstanceCount) + appToScaleName = CreateTestApp(cfg, "dynamic-policy", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - StartApp(appName, cfg.CfPushTimeoutDuration()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) }) BeforeEach(func() { maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage @@ -52,16 +52,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale out and then back in.", func() { By(fmt.Sprintf("Use heap %d MB of heap on app", int64(heapToUse))) - CurlAppInstance(cfg, appName, 0, fmt.Sprintf("/memory/%d/5", int64(heapToUse))) + CurlAppInstance(cfg, appToScaleName, 0, fmt.Sprintf("/memory/%d/5", int64(heapToUse))) By("wait for scale to 2") - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("Drop memory used by app") - CurlAppInstance(cfg, appName, 0, "/memory/close") + CurlAppInstance(cfg, appToScaleName, 0, "/memory/close") By("Wait for scale to minimum instances") - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -78,16 +78,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale out and back in", func() { heapToUse := min(maxHeapLimitMb, int(float64(cfg.NodeMemoryLimit)*0.80)) By(fmt.Sprintf("use 80%% or %d MB of memory in app", heapToUse)) - CurlAppInstance(cfg, appName, 0, fmt.Sprintf("/memory/%d/5", heapToUse)) + CurlAppInstance(cfg, appToScaleName, 0, fmt.Sprintf("/memory/%d/5", heapToUse)) By("Wait for scale to 2 instances") - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("drop memory used") - CurlAppInstance(cfg, appName, 0, "/memory/close") + CurlAppInstance(cfg, appToScaleName, 0, "/memory/close") By("Wait for scale down to 1 instance") - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -112,7 +112,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/slow/100", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/slow/100", cfg) ticker = time.NewTicker(1 * time.Second) rps := 5 go func(chan bool) { @@ -131,7 +131,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out", Label(acceptance.LabelSmokeTests), func() { - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) }) }) @@ -143,7 +143,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/slow/100", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/slow/100", cfg) ticker = time.NewTicker(1 * time.Second) rps := 5 go func(chan bool) { @@ -162,7 +162,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale in", func() { - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -187,7 +187,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/fast", cfg) ticker = time.NewTicker(1 * time.Second) rps := 20 go func(chan bool) { @@ -206,7 +206,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out", func() { - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) }) }) @@ -218,7 +218,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/fast", cfg) ticker = time.NewTicker(1 * time.Second) rps := 20 go func(chan bool) { @@ -239,7 +239,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale in", func() { // because we are generating 20rps but starting with 2 instances, // there should be on average 10rps per instance, which should trigger the scale in - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -254,14 +254,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("when cpu is greater than scaling out threshold", func() { By("should scale out to 2 instances") - StartCPUUsage(cfg, appName, int(float64(cfg.CPUUpperThreshold)*0.9), 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartCPUUsage(cfg, appToScaleName, int(float64(cfg.CPUUpperThreshold)*0.9), 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("should scale in to 1 instance after cpu usage is reduced") //only hit the one instance that was asked to run hot. - StopCPUUsage(cfg, appName, 0) + StopCPUUsage(cfg, appToScaleName, 0) - WaitForNInstancesRunning(appGUID, 1, 10*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 10*time.Minute) }) }) @@ -285,16 +285,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { // - app memory = 1GB // - app CPU entitlement = 4096[total shares] / (32[GB host ram] * 1024) * (1[app memory in GB] * 1024) * 0,1953 ~= 25% - ScaleMemory(cfg, appName, cfg.CPUUtilScalingPolicyTest.AppMemory) + ScaleMemory(cfg, appToScaleName, cfg.CPUUtilScalingPolicyTest.AppMemory) // cpuutil will be 100% if cpu usage is reaching the value of cpu entitlement maxCPUUsage := cfg.CPUUtilScalingPolicyTest.AppCPUEntitlement - StartCPUUsage(cfg, appName, maxCPUUsage, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartCPUUsage(cfg, appToScaleName, maxCPUUsage, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to run hot - StopCPUUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopCPUUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -305,14 +305,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out and in", func() { - ScaleDisk(cfg, appName, "1GB") + ScaleDisk(cfg, appToScaleName, "1GB") - StartDiskUsage(cfg, appName, 800, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartDiskUsage(cfg, appToScaleName, 800, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to occupy disk space - StopDiskUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopDiskUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -323,14 +323,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out and in", func() { - ScaleDisk(cfg, appName, "1GB") + ScaleDisk(cfg, appToScaleName, "1GB") - StartDiskUsage(cfg, appName, 800, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartDiskUsage(cfg, appToScaleName, 800, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to occupy disk space - StopDiskUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopDiskUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) diff --git a/src/acceptance/app/lead_times_test.go b/src/acceptance/app/lead_times_test.go index 876a15e34b..f833ee348c 100644 --- a/src/acceptance/app/lead_times_test.go +++ b/src/acceptance/app/lead_times_test.go @@ -15,11 +15,11 @@ var _ = Describe("Autoscaler lead times for scaling", func() { ) BeforeEach(func() { policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "labeled-go_app", 1) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "labeled-go_app", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) AfterEach(AppAfterEach) @@ -29,8 +29,8 @@ var _ = Describe("Autoscaler lead times for scaling", func() { coolDown := TestCoolDownSeconds * time.Second scalingTimewindow := 130 * time.Second // be friendly and allow some time for "internal autoscaler processes" (metric polling interval etc.) to take place before actual scaling happens - sendMetricForScaleOutAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appGUID, appName, 510, false) - sendMetricForScaleInAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appGUID, appName, 490, false) + sendMetricForScaleOutAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 510, false) + sendMetricForScaleInAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 490, false) By("checking that no scaling out happens before breach_duration_secs have passed") Consistently(sendMetricForScaleOutAndReturnNumInstancesFunc). diff --git a/src/acceptance/app/recurring_schedule_policy_test.go b/src/acceptance/app/recurring_schedule_policy_test.go index 5888e6b265..8082131d12 100644 --- a/src/acceptance/app/recurring_schedule_policy_test.go +++ b/src/acceptance/app/recurring_schedule_policy_test.go @@ -22,8 +22,8 @@ var _ = Describe("AutoScaler recurring schedule policy", func() { BeforeEach(func() { instanceName = CreateService(cfg) initialInstanceCount = 1 - appName = CreateTestApp(cfg, "recurring-schedule", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "recurring-schedule", initialInstanceCount) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) }) AfterEach(AppAfterEach) @@ -36,21 +36,21 @@ var _ = Describe("AutoScaler recurring schedule policy", func() { JustBeforeEach(func() { startTime, endTime = getStartAndEndTime(time.UTC, 70*time.Second, time.Duration(interval+120)*time.Second) policy = GenerateDynamicAndRecurringSchedulePolicy(instanceMinCount, 4, 50, "UTC", startTime, endTime, daysOfMonthOrWeek, scheduleInstanceMinCount, 5, scheduleInitialMinInstanceCount) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) scaleDown := func() { By("setting to initial_min_instance_count") jobRunTime := time.Until(startTime.Add(5 * time.Minute)) - WaitForNInstancesRunning(appGUID, scheduleInitialMinInstanceCount, jobRunTime, "The schedule should initially trigger scaling to initial_min_instance_count %i", scheduleInitialMinInstanceCount) + WaitForNInstancesRunning(appToScaleGUID, scheduleInitialMinInstanceCount, jobRunTime, "The schedule should initially trigger scaling to initial_min_instance_count %i", scheduleInitialMinInstanceCount) By("setting schedule's instance_min_count") jobRunTime = time.Until(endTime) - WaitForNInstancesRunning(appGUID, scheduleInstanceMinCount, jobRunTime, "The schedule should allow scaling down to instance_min_count %i", scheduleInstanceMinCount) + WaitForNInstancesRunning(appToScaleGUID, scheduleInstanceMinCount, jobRunTime, "The schedule should allow scaling down to instance_min_count %i", scheduleInstanceMinCount) By("setting to default instance_min_count") - WaitForNInstancesRunning(appGUID, instanceMinCount, time.Until(endTime.Add(time.Duration(interval+60)*time.Second)), "After the schedule ended scaling down to instance_min_count %i should be possible", instanceMinCount) + WaitForNInstancesRunning(appToScaleGUID, instanceMinCount, time.Until(endTime.Add(time.Duration(interval+60)*time.Second)), "After the schedule ended scaling down to instance_min_count %i should be possible", instanceMinCount) } Context("with days of month", func() { diff --git a/src/acceptance/app/specificdate_schedule_policy_test.go b/src/acceptance/app/specificdate_schedule_policy_test.go index 54af63779f..90ee10e7c9 100644 --- a/src/acceptance/app/specificdate_schedule_policy_test.go +++ b/src/acceptance/app/specificdate_schedule_policy_test.go @@ -21,8 +21,8 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { BeforeEach(func() { instanceName = CreateService(cfg) initialInstanceCount = 1 - appName = CreateTestApp(cfg, "date-schedule", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "date-schedule", initialInstanceCount) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) }) @@ -34,22 +34,22 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { const scheduledInstanceInit = 3 JustBeforeEach(func() { //TODO the start app needs to be after the binding but the timings require the app been up already. - StartApp(appName, cfg.CfPushTimeoutDuration()) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) startDateTime = time.Now().In(time.UTC).Add(1 * time.Minute) endDateTime = startDateTime.Add(time.Duration(interval+120) * time.Second) policy = GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime, scheduleInstanceMin, scheduleInstanceMax, scheduledInstanceInit) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) }) It("should scale", func() { pollTime := 15 * time.Second By(fmt.Sprintf("waiting for scheduledInstanceInit: %d", scheduledInstanceInit)) - WaitForNInstancesRunning(appGUID, 3, time.Until(startDateTime.Add(1*time.Minute))) + WaitForNInstancesRunning(appToScaleGUID, 3, time.Until(startDateTime.Add(1*time.Minute))) By(fmt.Sprintf("waiting for scheduleInstanceMin: %d", scheduleInstanceMin)) jobRunTime := time.Until(endDateTime) - Eventually(func() (int, error) { return RunningInstances(appGUID, cfg.DefaultTimeoutDuration()) }). + Eventually(func() (int, error) { return RunningInstances(appToScaleGUID, cfg.DefaultTimeoutDuration()) }). //+/- poll time error margin. WithTimeout(jobRunTime + pollTime). WithPolling(pollTime). @@ -58,12 +58,12 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { //+/- poll time error margin. timeout := time.Until(endDateTime) - pollTime By(fmt.Sprintf("waiting till end of schedule %dS and should stay %d instances", int(timeout.Seconds()), scheduleInstanceMin)) - Consistently(func() (int, error) { return RunningInstances(appGUID, jobRunTime) }). + Consistently(func() (int, error) { return RunningInstances(appToScaleGUID, jobRunTime) }). WithTimeout(timeout). WithPolling(pollTime). Should(Equal(2)) - WaitForNInstancesRunning(appGUID, 1, time.Duration(interval+60)*time.Second) + WaitForNInstancesRunning(appToScaleGUID, 1, time.Duration(interval+60)*time.Second) }) }) }) diff --git a/src/acceptance/assets/app/go_app/Makefile b/src/acceptance/assets/app/go_app/Makefile index e4194da52b..ab06d99869 100644 --- a/src/acceptance/assets/app/go_app/Makefile +++ b/src/acceptance/assets/app/go_app/Makefile @@ -119,7 +119,7 @@ deploy: build .PHONY: clean clean: - @echo "# cleaning autoscaler" + @echo "# cleaning go_app" @go clean -cache -testcache @rm --force --recursive './build' @rm --force --recursive './internal/app/appfakes' diff --git a/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go b/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go index 26d919ab8c..0fd629a730 100644 --- a/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go +++ b/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go @@ -19,13 +19,15 @@ import ( //counterfeiter:generate . CustomMetricClient type CustomMetricClient interface { - PostCustomMetric(ctx context.Context, appConfig *cfenv.App, metricsValue float64, metricName string, useMtls bool) error + PostCustomMetric(ctx context.Context, logger logr.Logger, appConfig *cfenv.App, metricsValue float64, metricName string, useMtls bool) error } type CustomMetricAPIClient struct{} var _ CustomMetricClient = &CustomMetricAPIClient{} +var CfenvCurrent = cfenv.Current + func CustomMetricsTests(logger logr.Logger, r *gin.RouterGroup, customMetricTest CustomMetricClient) *gin.RouterGroup { r.GET("/mtls/:name/:value", handleCustomMetricsEndpoint(logger, customMetricTest, true)) r.GET("/:name/:value", handleCustomMetricsEndpoint(logger, customMetricTest, false)) @@ -52,8 +54,11 @@ func handleCustomMetricsEndpoint(logger logr.Logger, customMetricTest CustomMetr Error(c, http.StatusBadRequest, "invalid metric value: %s", err.Error()) return } + // required if producer app is sending metric for appToScaleGuid + appToScaleGuid := c.Query("appToScaleGuid") + appConfig := &cfenv.App{AppID: appToScaleGuid} - err = customMetricTest.PostCustomMetric(c, nil, float64(metricValue), metricName, useMtls) + err = customMetricTest.PostCustomMetric(c, logger, appConfig, float64(metricValue), metricName, useMtls) if err != nil { logger.Error(err, "failed to submit custom metric") Error(c, http.StatusInternalServerError, "failed to submit custom metric: %s", err.Error()) @@ -63,15 +68,20 @@ func handleCustomMetricsEndpoint(logger logr.Logger, customMetricTest CustomMetr } } -func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, appConfig *cfenv.App, metricValue float64, metricName string, useMtls bool) error { - var err error - if appConfig == nil { - appConfig, err = cfenv.Current() - if err != nil { - return fmt.Errorf("cloud foundry environment not found %w", err) - } +func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, logger logr.Logger, appConfig *cfenv.App, metricValue float64, metricName string, useMtls bool) error { + currentApp, err := CfenvCurrent() + if err != nil { + return fmt.Errorf("cloud foundry environment not found %w", err) + } + // appToScale is provided i.e. producer and consumer app relationship + if appConfig != nil && appConfig.AppID != "" { + logger.Info("producer-app-relationship-found", "appToScaleGuid", appConfig.AppID) + //assuming the producer app has the same autoscaler service credentials as appToScale + appConfig.Services = currentApp.Services + } + if appConfig.AppID == "" { + appConfig = currentApp } - appId := api.GUID(appConfig.AppID) autoscalerCredentials, err := getAutoscalerCredentials(appConfig) if err != nil { @@ -103,7 +113,7 @@ func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, appConfig *c } metrics := createSingletonMetric(metricName, metricValue) - + logger.Info("sending metric to autoscaler for app", "appId", appId, "metricName", metricName, "metricValue", metricValue) params := api.V1AppsAppGuidMetricsPostParams{AppGuid: appId} return apiClient.V1AppsAppGuidMetricsPost(ctx, metrics, params) diff --git a/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go b/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go index 83e0ca6087..c619985d1e 100644 --- a/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go +++ b/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go @@ -2,8 +2,11 @@ package app_test import ( "context" + "errors" "net/http" + "github.com/go-logr/logr" + "code.cloudfoundry.org/app-autoscaler-release/src/acceptance/assets/app/go_app/internal/app" "code.cloudfoundry.org/app-autoscaler-release/src/acceptance/assets/app/go_app/internal/app/appfakes" api "code.cloudfoundry.org/app-autoscaler-release/src/acceptance/assets/app/go_app/internal/custommetrics" @@ -14,9 +17,8 @@ import ( ) var _ = Describe("custom metrics tests", func() { - + var fakeCustomMetricClient *appfakes.FakeCustomMetricClient Context("custom metrics handler", func() { - fakeCustomMetricClient := &appfakes.FakeCustomMetricClient{} It("should err if value out of bounds", func() { apiTest(nil, nil, nil, nil). @@ -35,6 +37,7 @@ var _ = Describe("custom metrics tests", func() { End() }) It("should post the custom metric", func() { + fakeCustomMetricClient = &appfakes.FakeCustomMetricClient{} apiTest(nil, nil, nil, fakeCustomMetricClient). Get("/custom-metrics/test/4"). Expect(GinkgoT()). @@ -42,17 +45,43 @@ var _ = Describe("custom metrics tests", func() { Body(`{"mtls":false}`). End() Expect(fakeCustomMetricClient.PostCustomMetricCallCount()).To(Equal(1)) - _, _, sentValue, sentMetric, mtlsUsed := fakeCustomMetricClient.PostCustomMetricArgsForCall(0) + _, _, appConfig, sentValue, sentMetric, mtlsUsed := fakeCustomMetricClient.PostCustomMetricArgsForCall(0) + Expect(appConfig.AppID).Should(Equal("")) Expect(sentMetric).Should(Equal("test")) Expect(sentValue).Should(Equal(4.0)) Expect(mtlsUsed).Should(Equal(false)) }) + When("appToScaleGuid is provided in a producer-consumer relationship", func() { + fakeCustomMetricClient = &appfakes.FakeCustomMetricClient{} + It("should post the custom metric with appToScaleGuid", func() { + fakeCustomMetricClient := &appfakes.FakeCustomMetricClient{} + apiTest(nil, nil, nil, fakeCustomMetricClient). + Get("/custom-metrics/test/5"). + QueryParams(map[string]string{"appToScaleGuid": "test-app-id"}). + Expect(GinkgoT()). + Status(http.StatusOK). + Body(`{"mtls":false}`). + End() + Expect(fakeCustomMetricClient.PostCustomMetricCallCount()).To(Equal(1)) + _, _, appConfig, sentValue, sentMetric, mtlsUsed := fakeCustomMetricClient.PostCustomMetricArgsForCall(0) + Expect(appConfig.AppID).Should(Equal("test-app-id")) + Expect(sentMetric).Should(Equal("test")) + Expect(sentValue).Should(Equal(5.0)) + Expect(mtlsUsed).Should(Equal(false)) + }) + }) + }) Context("PostCustomMetrics", func() { - It("should post a custom metric", func() { + var ( + service cfenv.Service + testAppId string + fakeServer *ghttp.Server + ) + BeforeEach(func() { - testAppId := "test-app-id" - fakeServer := ghttp.NewServer() + testAppId = "test-app-id" + fakeServer = ghttp.NewServer() username := "test-user" password := "test-pass" fakeServer.AppendHandlers( @@ -77,24 +106,59 @@ var _ = Describe("custom metrics tests", func() { Password: password, URL: fakeServer.URL(), } - service := cfenv.Service{ + service = cfenv.Service{ Name: "test", Tags: []string{"app-autoscaler"}, Credentials: map[string]interface{}{"custom_metrics": customMetricsCredentials}, } - - appEnv := cfenv.App{ - AppID: testAppId, - Index: 0, - Services: map[string][]cfenv.Service{"autoscaler": {service}}, + }) + It("should post a custom metric", func() { + app.CfenvCurrent = func() (*cfenv.App, error) { + return &cfenv.App{ + AppID: testAppId, + Index: 0, + Services: map[string][]cfenv.Service{"autoscaler": {service}}, + }, nil } - + appEnv, _ := app.CfenvCurrent() client := &app.CustomMetricAPIClient{} - err := client.PostCustomMetric(context.TODO(), &appEnv, 42, "test", false) + err := client.PostCustomMetric(context.TODO(), logr.Logger{}, appEnv, 42, "test", false) Expect(err).ToNot(HaveOccurred()) Expect(len(fakeServer.ReceivedRequests())).To(Equal(1)) fakeServer.Close() }) + Context("verify configs", func() { + When("cloud foundry environment is not found", func() { + It("should return error if cloud foundry environment is not found", func() { + app.CfenvCurrent = func() (*cfenv.App, error) { + return nil, errors.New("cloud foundry environment not found") + } + client := &app.CustomMetricAPIClient{} + err := client.PostCustomMetric(context.TODO(), logr.Logger{}, &cfenv.App{}, 42, "test", false) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cloud foundry environment not found")) + }) + }) + When("appToScaleGuid is provided in a producer-consumer relationship", func() { + It("should set appConfig with appToScaleGuid and services", func() { + + app.CfenvCurrent = func() (*cfenv.App, error) { + return &cfenv.App{AppID: testAppId, Services: map[string][]cfenv.Service{"autoscaler": {service}}}, nil + } + appConfig, _ := app.CfenvCurrent() + client := &app.CustomMetricAPIClient{} + err := client.PostCustomMetric(context.TODO(), logr.Logger{}, appConfig, 42, "test", false) + Expect(err).ToNot(HaveOccurred()) + Expect(appConfig.Services).NotTo(BeNil()) + Expect(appConfig.AppID).To(Equal(testAppId)) + Expect(len(fakeServer.ReceivedRequests())).To(Equal(1)) + fakeServer.Close() + }) + }) + + }) + }) + }) diff --git a/src/acceptance/assets/file/policy/default_policy-with-configuration.json b/src/acceptance/assets/file/policy/default_policy-with-configuration.json new file mode 100644 index 0000000000..e30e7c3ee2 --- /dev/null +++ b/src/acceptance/assets/file/policy/default_policy-with-configuration.json @@ -0,0 +1,98 @@ +{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "same_app" + } + } + }, + "instance_min_count": 1, + "instance_max_count": 4, + "scaling_rules": [ + { + "metric_type": "memoryused", + "breach_duration_secs": 600, + "threshold": 30, + "operator": "<", + "cool_down_secs": 300, + "adjustment": "-1" + }, + { + "metric_type": "memoryused", + "breach_duration_secs": 600, + "threshold": 90, + "operator": ">=", + "cool_down_secs": 300, + "adjustment": "+1" + } + ], + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [ + { + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }, + { + "start_date": "2099-06-27", + "end_date": "2099-07-23", + "start_time": "11:00", + "end_time": "19:30", + "days_of_month": [ + 5, + 15, + 25 + ], + "instance_min_count": 3, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }, + { + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 4, + 5, + 6 + ], + "instance_min_count": 1, + "instance_max_count": 10 + }, + { + "start_time": "11:00", + "end_time": "19:30", + "days_of_month": [ + 10, + 20, + 30 + ], + "instance_min_count": 1, + "instance_max_count": 10 + } + ], + "specific_date": [ + { + "start_date_time": "2099-06-02T10:00", + "end_date_time": "2099-06-15T13:59", + "instance_min_count": 1, + "instance_max_count": 4, + "initial_min_instance_count": 2 + }, + { + "start_date_time": "2099-01-04T20:00", + "end_date_time": "2099-02-19T23:15", + "instance_min_count": 2, + "instance_max_count": 5, + "initial_min_instance_count": 3 + } + ] + } +} diff --git a/src/acceptance/assets/file/policy/policy-with-configuration-same-app.json b/src/acceptance/assets/file/policy/policy-with-configuration-same-app.json new file mode 100644 index 0000000000..e4d492305d --- /dev/null +++ b/src/acceptance/assets/file/policy/policy-with-configuration-same-app.json @@ -0,0 +1,100 @@ +{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "same_app" + } + } + }, + "instance_min_count": 1, + "instance_max_count": 4, + "scaling_rules": [ + { + "metric_type": "memoryused", + "breach_duration_secs": 600, + "threshold": 30, + "operator": "<", + "cool_down_secs": 300, + "adjustment": "-1" + }, + { + "metric_type": "memoryused", + "breach_duration_secs": 600, + "threshold": 90, + "operator": ">=", + "cool_down_secs": 300, + "adjustment": "+1" + } + ], + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [ + { + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }, + { + "start_date": "2099-06-27", + "end_date": "2099-07-23", + "start_time": "11:00", + "end_time": "19:30", + "days_of_month": [ + 5, + 15, + 25 + ], + "instance_min_count": 3, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }, + { + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 4, + 5, + 6 + ], + "instance_min_count": 1, + "instance_max_count": 10 + }, + { + "start_time": "11:00", + "end_time": "19:30", + "days_of_month": [ + 10, + 20, + 30 + ], + "instance_min_count": 1, + "instance_max_count": 10 + } + ], + "specific_date": [ + { + "start_date_time": "2099-06-02T10:00", + "end_date_time": "2099-06-15T13:59", + "instance_min_count": 1, + "instance_max_count": 4, + "initial_min_instance_count": 2 + }, + { + "start_date_time": "2099-01-04T20:00", + "end_date_time": "2099-02-19T23:15", + "instance_min_count": 2, + "instance_max_count": 5, + "initial_min_instance_count": 3 + } + ] + } +} + + diff --git a/src/acceptance/assets/file/policy/policy-with-configuration.json b/src/acceptance/assets/file/policy/policy-with-configuration.json new file mode 100644 index 0000000000..94dcf15ae0 --- /dev/null +++ b/src/acceptance/assets/file/policy/policy-with-configuration.json @@ -0,0 +1,24 @@ +{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_max_count":4, + "instance_min_count":1, + "scaling_rules":[ + { + "metric_type":"test_metric", + "threshold":500, + "operator":">", + "adjustment":"+1" + }, { + "metric_type":"test_metric", + "threshold":100, + "operator":"<", + "adjustment":"-1" + } + ] +} diff --git a/src/acceptance/broker/broker_test.go b/src/acceptance/broker/broker_test.go index 97b2475568..c9474b4d0a 100644 --- a/src/acceptance/broker/broker_test.go +++ b/src/acceptance/broker/broker_test.go @@ -95,12 +95,26 @@ var _ = Describe("AutoScaler Service Broker", func() { It("binds&unbinds with policy", func() { policyFile := "../assets/file/policy/all.json" + + err := helpers.BindServiceToAppWithPolicy(cfg, appName, instance.name(), policyFile) + Expect(err).NotTo(HaveOccurred()) + expectedPolicyFile := "../assets/file/policy/policy-with-configuration-same-app.json" + expectedPolicyWithConfig, err := os.ReadFile(expectedPolicyFile) + Expect(err).NotTo(HaveOccurred()) + bindingParameters := helpers.GetServiceCredentialBindingParameters(cfg, instance.name(), appName) + Expect(bindingParameters).Should(MatchJSON(expectedPolicyWithConfig)) + + instance.unbind(appName) + }) + It("binds&unbinds with configurations and policy", func() { + policyFile := "../assets/file/policy/policy-with-configuration.json" policy, err := os.ReadFile(policyFile) + fmt.Println("policy", string(policy)) //FIXME Expect(err).NotTo(HaveOccurred()) err = helpers.BindServiceToAppWithPolicy(cfg, appName, instance.name(), policyFile) Expect(err).NotTo(HaveOccurred()) - + By("checking broker bind parameter response should have policy and configuration") bindingParameters := helpers.GetServiceCredentialBindingParameters(cfg, instance.name(), appName) Expect(bindingParameters).Should(MatchJSON(policy)) @@ -128,10 +142,21 @@ var _ = Describe("AutoScaler Service Broker", func() { instance.unbind(appName) }) - It("bind&unbinds without policy", func() { + It("bind&unbinds without policy and gives only configuration", func() { helpers.BindServiceToApp(cfg, appName, instance.name()) + By("checking broker bind parameter response does not have policy but configuration only") bindingParameters := helpers.GetServiceCredentialBindingParameters(cfg, instance.name(), appName) - Expect(bindingParameters).Should(MatchJSON("{}")) + expectedJSON := `{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "same_app" + } + } + } + }` + + Expect(bindingParameters).Should(MatchJSON(expectedJSON)) instance.unbind(appName) }) @@ -143,7 +168,7 @@ var _ = Describe("AutoScaler Service Broker", func() { Describe("allows setting default policies", func() { var instance serviceInstance var defaultPolicy []byte - var policy []byte + var expectedDefaultPolicyWithConfigsJSON []byte BeforeEach(func() { instance = createServiceWithParameters(cfg.ServicePlan, "../assets/file/policy/default_policy.json") @@ -159,7 +184,13 @@ var _ = Describe("AutoScaler Service Broker", func() { err = json.Unmarshal(defaultPolicy, &serviceParameters) Expect(err).NotTo(HaveOccurred()) - policy, err = json.Marshal(serviceParameters.DefaultPolicy) + expectedDefaultPolicyWithConfigsJSON, err = os.ReadFile("../assets/file/policy/default_policy-with-configuration.json") + Expect(err).NotTo(HaveOccurred()) + var serviceParametersWithConfigs = struct { + Configuration interface{} `json:"configuration"` + DefaultPolicy interface{} `json:"default_policy"` + }{} + err = json.Unmarshal(expectedDefaultPolicyWithConfigsJSON, &serviceParametersWithConfigs) Expect(err).NotTo(HaveOccurred()) }) @@ -170,9 +201,9 @@ var _ = Describe("AutoScaler Service Broker", func() { It("sets the default policy if no policy is set during binding and allows retrieving the policy via the binding parameters", func() { helpers.BindServiceToApp(cfg, appName, instance.name()) - + By("checking broker bind parameter response should have default policy and configuration") bindingParameters := helpers.GetServiceCredentialBindingParameters(cfg, instance.name(), appName) - Expect(bindingParameters).Should(MatchJSON(policy)) + Expect(bindingParameters).Should(MatchJSON(expectedDefaultPolicyWithConfigsJSON)) unbindService := cf.Cf("unbind-service", appName, instance.name()).Wait(cfg.DefaultTimeoutDuration()) Expect(unbindService).To(Exit(0), "failed unbinding service from app") @@ -192,6 +223,7 @@ var _ = Describe("AutoScaler Service Broker", func() { var instance serviceInstance It("should update a service instance from one plan to another plan", func() { servicePlans := GetServicePlans(cfg) + fmt.Println("servicePlans", servicePlans) source, target, err := servicePlans.getSourceAndTargetForPlanUpdate() Expect(err).NotTo(HaveOccurred(), "failed getting source and target service plans") instance = createService(source.Name) diff --git a/src/acceptance/helpers/apps.go b/src/acceptance/helpers/apps.go index 44390f2882..f44803cb7e 100644 --- a/src/acceptance/helpers/apps.go +++ b/src/acceptance/helpers/apps.go @@ -82,9 +82,9 @@ func SendMetric(cfg *config.Config, appName string, metric int) { cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/test_metric/%d", metric), "-f") } -func SendMetricMTLS(cfg *config.Config, appName string, metric int) { +func SendMetricMTLS(cfg *config.Config, appGuid string, appName string, metric int) { GinkgoHelper() - cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/mtls/test_metric/%d", metric), "-f") + cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/mtls/test_metric/%d?appToScaleGuid=%s", metric, appGuid), "-f") } func StartAppWithErr(appName string, timeout time.Duration) error { @@ -108,7 +108,7 @@ func StartApp(appName string, timeout time.Duration) bool { func CreateTestApp(cfg *config.Config, appType string, initialInstanceCount int) string { appName := generator.PrefixedRandomName(cfg.Prefix, appType) - By("Creating test app") + By(fmt.Sprintf("Creating test app %s", appName)) CreateTestAppByName(cfg, appName, initialInstanceCount) return appName } diff --git a/src/acceptance/helpers/helpers.go b/src/acceptance/helpers/helpers.go index 5af7ec3c68..1cb2124b3c 100644 --- a/src/acceptance/helpers/helpers.go +++ b/src/acceptance/helpers/helpers.go @@ -34,6 +34,27 @@ const ( type Days string +type BindingConfig struct { + Configuration Configuration `json:"configuration"` + ScalingPolicy +} + +type Configuration struct { + CustomMetrics CustomMetricsConfig `json:"custom_metrics"` +} + +type CustomMetricsConfig struct { + Auth Auth `json:"auth,omitempty"` + MetricSubmissionStrategy MetricsSubmissionStrategy `json:"metric_submission_strategy"` +} + +type Auth struct { + CredentialType string `json:"credential_type"` +} +type MetricsSubmissionStrategy struct { + AllowFrom string `json:"allow_from"` +} + type ScalingPolicy struct { InstanceMin int `json:"instance_min_count"` InstanceMax int `json:"instance_max_count"` @@ -162,7 +183,27 @@ func ServicePlansUrl(cfg *config.Config, spaceGuid string) string { return url.String() } +func GenerateBindingsWithScalingPolicy(allowFrom string, instanceMin, instanceMax int, metricName string, scaleInThreshold, scaleOutThreshold int64) string { + bindingConfig := BindingConfig{ + Configuration: Configuration{CustomMetrics: CustomMetricsConfig{ + MetricSubmissionStrategy: MetricsSubmissionStrategy{AllowFrom: allowFrom}, + }}, + ScalingPolicy: buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInThreshold, scaleOutThreshold), + } + marshalledBinding, err := MarshalWithoutHTMLEscape(bindingConfig) + Expect(err).NotTo(HaveOccurred()) + return string(marshalledBinding) +} + func GenerateDynamicScaleOutPolicy(instanceMin, instanceMax int, metricName string, threshold int64) string { + policy := buildScalingPolicy(instanceMin, instanceMax, metricName, threshold) + marshaled, err := MarshalWithoutHTMLEscape(policy) + Expect(err).NotTo(HaveOccurred()) + + return string(marshaled) +} + +func buildScalingPolicy(instanceMin int, instanceMax int, metricName string, threshold int64) ScalingPolicy { scalingOutRule := ScalingRule{ MetricType: metricName, BreachDurationSeconds: TestBreachDurationSeconds, @@ -171,16 +212,12 @@ func GenerateDynamicScaleOutPolicy(instanceMin, instanceMax int, metricName stri CoolDownSeconds: TestCoolDownSeconds, Adjustment: "+1", } - policy := ScalingPolicy{ InstanceMin: instanceMin, InstanceMax: instanceMax, ScalingRules: []*ScalingRule{&scalingOutRule}, } - marshaled, err := MarshalWithoutHTMLEscape(policy) - Expect(err).NotTo(HaveOccurred()) - - return string(marshaled) + return policy } func GenerateDynamicScaleOutPolicyWithExtraFields(instanceMin, instanceMax int, metricName string, threshold int64) (string, string) { @@ -224,28 +261,15 @@ func GenerateDynamicScaleOutPolicyWithExtraFields(instanceMin, instanceMax int, return string(extraBytes), string(validBytes) } -func GenerateDynamicScaleInPolicy(instanceMin, instanceMax int, metricName string, threshold int64) string { - scalingInRule := ScalingRule{ - MetricType: metricName, - BreachDurationSeconds: TestBreachDurationSeconds, - Threshold: threshold, - Operator: "<", - CoolDownSeconds: TestCoolDownSeconds, - Adjustment: "-1", - } - - policy := ScalingPolicy{ - InstanceMin: instanceMin, - InstanceMax: instanceMax, - ScalingRules: []*ScalingRule{&scalingInRule}, - } +func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) string { + policy := buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInWhenBelowThreshold, scaleOutWhenGreaterOrEqualThreshold) marshaled, err := MarshalWithoutHTMLEscape(policy) Expect(err).NotTo(HaveOccurred()) return string(marshaled) } -func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) string { +func buildScaleOutScaleInPolicy(instanceMin int, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) ScalingPolicy { scalingOutRule := ScalingRule{ MetricType: metricName, BreachDurationSeconds: TestBreachDurationSeconds, @@ -267,11 +291,7 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName InstanceMax: instanceMax, ScalingRules: []*ScalingRule{&scalingOutRule, &scalingInRule}, } - - marshaled, err := MarshalWithoutHTMLEscape(policy) - Expect(err).NotTo(HaveOccurred()) - - return string(marshaled) + return policy } // GenerateDynamicScaleInPolicyBetween creates a scaling policy that scales down from 2 instances to 1, if the metric value is in a range of [upper, lower]. @@ -460,15 +480,15 @@ func MarshalWithoutHTMLEscape(v interface{}) ([]byte, error) { func CreatePolicy(cfg *config.Config, appName, appGUID, policy string) string { GinkgoHelper() - instanceName, _ := createPolicy(cfg, appName, appGUID, policy) + instanceName, _ := createPolicy(cfg, appName, policy) return instanceName } -func CreatePolicyWithErr(cfg *config.Config, appName, appGUID, policy string) (string, error) { - return createPolicy(cfg, appName, appGUID, policy) +func CreatePolicyWithErr(cfg *config.Config, appName, policy string) (string, error) { + return createPolicy(cfg, appName, policy) } -func createPolicy(cfg *config.Config, appName, appGUID, policy string) (string, error) { +func createPolicy(cfg *config.Config, appName, policy string) (string, error) { GinkgoHelper() instanceName := generator.PrefixedRandomName(cfg.Prefix, cfg.InstancePrefix) err := Retry(defaultRetryAttempt, defaultRetryAfter, func() error { return CreateServiceWithPlan(cfg, cfg.ServicePlan, instanceName) }) @@ -576,23 +596,6 @@ func GetServiceCredentialBindingParameters(cfg *config.Config, instanceName stri return strings.TrimSpace(string(cmd.Out.Contents())) } -func CreatePolicyWithAPI(cfg *config.Config, appGUID, policy string) { - GinkgoHelper() - oauthToken := OauthToken(cfg) - client := GetHTTPClient(cfg) - - policyURL := fmt.Sprintf("%s%s", cfg.ASApiEndpoint, strings.Replace(PolicyPath, "{appId}", appGUID, -1)) - req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer([]byte(policy))) - Expect(err).ShouldNot(HaveOccurred()) - req.Header.Add("Authorization", oauthToken) - req.Header.Add("Content-Type", "application/json") - - resp, err := client.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer func() { _ = resp.Body.Close() }() - Expect(resp).Should(HaveHTTPStatus(200, 201), "assigning policy by putting to %s", policyURL) -} - func GetHTTPClient(cfg *config.Config) *http.Client { return &http.Client{ Transport: &http.Transport{ @@ -627,13 +630,6 @@ func GetAppGuid(cfg *config.Config, appName string) (string, error) { return appGuid, err } -func FailOnCommandFailuref(command *Session, format string, args ...any) *Session { - if command.ExitCode() != 0 { - Fail(fmt.Sprintf(format, args...)) - } - return command -} - func SetLabel(cfg *config.Config, appGUID string, labelKey string, labelValue string) { GinkgoHelper() cmd := cf.Cf("curl", "--fail", fmt.Sprintf("/v3/apps/%s", appGUID), "-X", "PATCH", "-d", fmt.Sprintf(`{"metadata": {"labels": {"%s": "%s"}}}`, labelKey, labelValue)).Wait(cfg.DefaultTimeoutDuration()) diff --git a/src/acceptance/setup_performance/setup_performance_test.go b/src/acceptance/setup_performance/setup_performance_test.go index d6d2462b91..7506a46717 100644 --- a/src/acceptance/setup_performance/setup_performance_test.go +++ b/src/acceptance/setup_performance/setup_performance_test.go @@ -92,12 +92,12 @@ func pushAppAndBindService(appName string, runningApps *int32, pendingApps *sync } policy := helpers.GenerateDynamicScaleOutAndInPolicy( 1, 2, "test_metric", 500, 500) - appGUID, err := helpers.GetAppGuid(cfg, appName) + _, err = helpers.GetAppGuid(cfg, appName) if err != nil { errors.Store(appName, err) return } - _, err = helpers.CreatePolicyWithErr(cfg, appName, appGUID, policy) + _, err = helpers.CreatePolicyWithErr(cfg, appName, policy) if err != nil { errors.Store(appName, err) return diff --git a/src/autoscaler/Makefile b/src/autoscaler/Makefile index 4a8ed8c098..5c2b141a3a 100644 --- a/src/autoscaler/Makefile +++ b/src/autoscaler/Makefile @@ -152,7 +152,7 @@ clean: mta-deploy: mta-build build-extension-file $(MAKE) -f metricsforwarder/Makefile set-security-group @echo "Deploying with extension file: $(EXTENSION_FILE)" - @cf deploy $(MAKEFILE_DIR)/$(DEST)/*.mtar -f --delete-services -e $(EXTENSION_FILE) + @cf deploy $(MAKEFILE_DIR)/$(DEST)/*.mtar --version-rule ALL -f --delete-services -e $(EXTENSION_FILE) build-extension-file: diff --git a/src/autoscaler/api/broker/broker.go b/src/autoscaler/api/broker/broker.go index 05d15baada..c1367aa203 100644 --- a/src/autoscaler/api/broker/broker.go +++ b/src/autoscaler/api/broker/broker.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "reflect" "regexp" "strings" @@ -39,15 +40,17 @@ type Broker struct { } var ( - emptyJSONObject = regexp.MustCompile(`^\s*{\s*}\s*$`) - ErrCreatingServiceBinding = errors.New("error creating service binding") - ErrUpdatingServiceInstance = errors.New("error updating service instance") - ErrDeleteSchedulesForUnbinding = errors.New("failed to delete schedules for unbinding") - ErrBindingDoesNotExist = errors.New("service binding does not exist") - ErrDeletePolicyForUnbinding = errors.New("failed to delete policy for unbinding") - ErrDeleteServiceBinding = errors.New("error deleting service binding") - ErrCredentialNotDeleted = errors.New("failed to delete custom metrics credential for unbinding") - ErrInvalidCredentialType = errors.New("invalid credential type provided: allowed values are [binding-secret, x509]") + emptyJSONObject = regexp.MustCompile(`^\s*{\s*}\s*$`) + ErrCreatingServiceBinding = errors.New("error creating service binding") + ErrUpdatingServiceInstance = errors.New("error updating service instance") + ErrDeleteSchedulesForUnbinding = errors.New("failed to delete schedules for unbinding") + ErrBindingDoesNotExist = errors.New("service binding does not exist") + ErrDeletePolicyForUnbinding = errors.New("failed to delete policy for unbinding") + ErrDeleteServiceBinding = errors.New("error deleting service binding") + ErrCredentialNotDeleted = errors.New("failed to delete custom metrics credential for unbinding") + ErrInvalidCredentialType = errors.New("invalid credential type provided: allowed values are [binding-secret, x509]") + ErrInvalidConfigurations = errors.New("invalid binding configurations provided") + ErrInvalidCustomMetricsStrategy = errors.New("error: custom metrics strategy not supported") ) type Errors []error @@ -496,6 +499,23 @@ func (b *Broker) Bind(ctx context.Context, instanceID string, bindingID string, if details.RawParameters != nil { policyJson = details.RawParameters } + bindingConfiguration := &models.BindingConfig{} + if policyJson != nil { + err := json.Unmarshal(policyJson, &bindingConfiguration) + if err != nil { + actionReadBindingConfiguration := "read-binding-configurations" + logger.Error("unmarshal-binding-configuration", err) + return result, apiresponses.NewFailureResponseBuilder( + ErrInvalidConfigurations, http.StatusBadRequest, actionReadBindingConfiguration). + WithErrorKey(actionReadBindingConfiguration). + Build() + } + } + // set the default custom metrics strategy if not provided + if bindingConfiguration.GetCustomMetricsStrategy() == "" { + bindingConfiguration.SetCustomMetricsStrategy(models.CustomMetricsSameApp) + } + logger.Info("binding-configuration", lager.Data{"bindingConfiguration": bindingConfiguration}) policy, err := b.getPolicyFromJsonRawMessage(policyJson, instanceID, details.PlanID) if err != nil { @@ -529,15 +549,18 @@ func (b *Broker) Bind(ctx context.Context, instanceID string, bindingID string, if err := b.handleExistingBindingsResiliently(ctx, instanceID, appGUID, logger); err != nil { return result, err } + // save custom metrics strategy check - bindingConfiguration.CustomMetricsConfig.MetricSubmissionStrategy ! == "" + err = createServiceBinding(ctx, b.bindingdb, bindingID, instanceID, appGUID, bindingConfiguration.GetCustomMetricsStrategy()) - // create binding in DB - err = b.bindingdb.CreateServiceBinding(ctx, bindingID, instanceID, appGUID) if err != nil { actionCreateServiceBinding := "create-service-binding" logger.Error(actionCreateServiceBinding, err) if errors.Is(err, db.ErrAlreadyExists) { return result, apiresponses.NewFailureResponse(errors.New("error: an autoscaler service instance is already bound to the application and multiple bindings are not supported"), http.StatusConflict, actionCreateServiceBinding) } + if errors.Is(err, ErrInvalidCustomMetricsStrategy) { + return result, apiresponses.NewFailureResponse(err, http.StatusBadRequest, actionCreateServiceBinding) + } return result, apiresponses.NewFailureResponse(ErrCreatingServiceBinding, http.StatusInternalServerError, actionCreateServiceBinding) } customMetricsCredentials := &models.CustomMetricsCredentials{ @@ -688,6 +711,8 @@ func (b *Broker) GetBinding(ctx context.Context, instanceID string, bindingID st if err != nil { return result, err } + bindingConfig := &models.BindingConfig{} + bindingConfig.SetCustomMetricsStrategy(serviceBinding.CustomMetricsStrategy) policy, err := b.policydb.GetAppPolicy(ctx, serviceBinding.AppID) if err != nil { @@ -695,13 +720,29 @@ func (b *Broker) GetBinding(ctx context.Context, instanceID string, bindingID st return domain.GetBindingSpec{}, apiresponses.NewFailureResponse(errors.New("failed to retrieve scaling policy"), http.StatusInternalServerError, "get-policy") } + var combinedConfig *models.BindingConfigWithScaling + if bindingConfig.GetCustomMetricsStrategy() != "" { + combinedConfig = &models.BindingConfigWithScaling{BindingConfig: *bindingConfig} + } if policy != nil { - result.Parameters = policy + areConfigAndPolicyPresent := combinedConfig != nil && policy.InstanceMin > 0 + if areConfigAndPolicyPresent { + combinedConfig.ScalingPolicy = *policy + result.Parameters = combinedConfig + } else { + result.Parameters = policy + } + } else if !b.isEmpty(bindingConfig) { + result.Parameters = bindingConfig } return result, nil } +func (b *Broker) isEmpty(bindingConfig *models.BindingConfig) bool { + return reflect.DeepEqual(bindingConfig, &models.BindingConfig{}) +} + func (b *Broker) getServiceBinding(ctx context.Context, bindingID string) (*models.ServiceBinding, error) { logger := b.logger.Session("get-service-binding", lager.Data{"bindingID": bindingID}) @@ -844,3 +885,10 @@ func (b *Broker) deleteBinding(ctx context.Context, bindingId string, serviceIns func isValidCredentialType(credentialType string) bool { return credentialType == models.BindingSecret || credentialType == models.X509Certificate } + +func createServiceBinding(ctx context.Context, bindingDB db.BindingDB, bindingID, instanceID, appGUID string, customMetricsStrategy string) error { + if customMetricsStrategy == models.CustomMetricsBoundApp || customMetricsStrategy == models.CustomMetricsSameApp { + return bindingDB.CreateServiceBinding(ctx, bindingID, instanceID, appGUID, customMetricsStrategy) + } + return ErrInvalidCustomMetricsStrategy +} diff --git a/src/autoscaler/api/broker/broker_test.go b/src/autoscaler/api/broker/broker_test.go index e9ffec9abd..ea0b98c476 100644 --- a/src/autoscaler/api/broker/broker_test.go +++ b/src/autoscaler/api/broker/broker_test.go @@ -2,6 +2,7 @@ package broker_test import ( "encoding/json" + "os" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/api/broker" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" @@ -19,12 +20,14 @@ import ( var _ = Describe("Broker", func() { var ( - aBroker *broker.Broker - err error - fakeBindingDB *fakes.FakeBindingDB - fakePolicyDB *fakes.FakePolicyDB - fakeCredentials *fakes.FakeCredentials - testLogger = lagertest.NewTestLogger("test") + aBroker *broker.Broker + err error + fakeBindingDB *fakes.FakeBindingDB + fakePolicyDB *fakes.FakePolicyDB + fakeCredentials *fakes.FakeCredentials + testLogger = lagertest.NewTestLogger("test") + bindingConfigWithScaling *models.BindingConfigWithScaling + bindingConfig *models.BindingConfig ) BeforeEach(func() { @@ -153,11 +156,10 @@ var _ = Describe("Broker", func() { }) }) Context("when the binding exists", func() { - BeforeEach(func() { - fakeBindingDB.GetServiceBindingReturns(&models.ServiceBinding{ServiceBindingID: testBindingId, ServiceInstanceID: testInstanceId, AppID: testAppId}, nil) - }) Context("without policy", func() { BeforeEach(func() { + fakeBindingDB.GetServiceBindingReturns(&models.ServiceBinding{ServiceBindingID: testBindingId, + ServiceInstanceID: testInstanceId, AppID: testAppId}, nil) fakePolicyDB.GetAppPolicyReturns(nil, nil) }) It("returns the empty binding without parameters", func() { @@ -175,6 +177,8 @@ var _ = Describe("Broker", func() { }) Context("with policy", func() { BeforeEach(func() { + fakeBindingDB.GetServiceBindingReturns(&models.ServiceBinding{ServiceBindingID: testBindingId, + ServiceInstanceID: testInstanceId, AppID: testAppId}, nil) fakePolicyDB.GetAppPolicyReturns(scalingPolicy, nil) }) It("returns the Binding with parameters", func() { @@ -182,6 +186,46 @@ var _ = Describe("Broker", func() { Expect(Binding).To(Equal(domain.GetBindingSpec{Parameters: scalingPolicy})) }) }) + Context("with configuration and policy", func() { + BeforeEach(func() { + bindingConfig = &models.BindingConfig{Configuration: models.Configuration{CustomMetrics: models.CustomMetricsConfig{ + MetricSubmissionStrategy: models.MetricsSubmissionStrategy{AllowFrom: "bound_app"}, + }, + }} + fakeBindingDB.GetServiceBindingReturns(&models.ServiceBinding{ServiceBindingID: testBindingId, + ServiceInstanceID: testInstanceId, AppID: testAppId, CustomMetricsStrategy: "bound_app"}, nil) + bindingBytes, err := os.ReadFile("testdata/policy-with-configs.json") + Expect(err).ShouldNot(HaveOccurred()) + + err = json.Unmarshal(bindingBytes, &bindingConfigWithScaling) + Expect(err).ShouldNot(HaveOccurred()) + fakePolicyDB.GetAppPolicyReturns(scalingPolicy, nil) + }) + It("returns the Binding with configs and policy in parameters", func() { + Expect(err).To(BeNil()) + Expect(Binding).To(Equal(domain.GetBindingSpec{Parameters: bindingConfigWithScaling})) + }) + }) + Context("with configuration only", func() { + BeforeEach(func() { + bindingConfig = &models.BindingConfig{Configuration: models.Configuration{CustomMetrics: models.CustomMetricsConfig{ + MetricSubmissionStrategy: models.MetricsSubmissionStrategy{AllowFrom: "bound_app"}, + }, + }} + fakeBindingDB.GetServiceBindingReturns(&models.ServiceBinding{ServiceBindingID: testBindingId, + ServiceInstanceID: testInstanceId, AppID: testAppId, CustomMetricsStrategy: "bound_app"}, nil) + bindingBytes, err := os.ReadFile("testdata/with-configs.json") + Expect(err).ShouldNot(HaveOccurred()) + + err = json.Unmarshal(bindingBytes, &bindingConfigWithScaling) + Expect(err).ShouldNot(HaveOccurred()) + fakePolicyDB.GetAppPolicyReturns(nil, nil) + }) + It("returns the bindings with configs in parameters", func() { + Expect(err).To(BeNil()) + Expect(Binding).To(Equal(domain.GetBindingSpec{Parameters: bindingConfig})) + }) + }) }) }) diff --git a/src/autoscaler/api/broker/testdata/policy-with-configs.json b/src/autoscaler/api/broker/testdata/policy-with-configs.json new file mode 100644 index 0000000000..3b2c08934a --- /dev/null +++ b/src/autoscaler/api/broker/testdata/policy-with-configs.json @@ -0,0 +1,19 @@ +{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_min_count": 1, + "instance_max_count": 5, + "scaling_rules": [ + { + "metric_type": "memoryused", + "threshold": 30, + "operator": "<", + "adjustment": "-1" + } + ] +} diff --git a/src/autoscaler/api/broker/testdata/with-configs.json b/src/autoscaler/api/broker/testdata/with-configs.json new file mode 100644 index 0000000000..6e51c6d47b --- /dev/null +++ b/src/autoscaler/api/broker/testdata/with-configs.json @@ -0,0 +1,9 @@ +{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + } +} diff --git a/src/autoscaler/api/brokerserver/broker_handler_test.go b/src/autoscaler/api/brokerserver/broker_handler_test.go index a92889867d..b6d3bd6a98 100644 --- a/src/autoscaler/api/brokerserver/broker_handler_test.go +++ b/src/autoscaler/api/brokerserver/broker_handler_test.go @@ -920,15 +920,87 @@ var _ = Describe("BrokerHandler", func() { Expect(schedulerServer.ReceivedRequests()).To(HaveLen(1)) }) It("returns the correct binding parameters", func() { - creds := &models.CredentialResponse{} - responseString := resp.Body.String() - err := json.Unmarshal([]byte(responseString), creds) + verifyCredentialsGenerated(resp) + }) + }) + When("Binding configurations are present", func() { + BeforeEach(func() { + bindingPolicy = `{ + "configuration": { + "custom_metrics": { + "auth": { + "credential_type": "binding_secret" + }, + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_max_count":4, + "instance_min_count":1, + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [{ + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }] + }, + "scaling_rules":[ + { + "metric_type":"memoryused", + "threshold":30, + "operator":"<", + "adjustment":"-1" + }] + }` + bindingRequestBody.Policy = json.RawMessage(bindingPolicy) + body, err = json.Marshal(bindingRequestBody) Expect(err).NotTo(HaveOccurred()) - Expect(*creds.Credentials.CustomMetrics.URL).To(Equal("someURL")) - Expect(creds.Credentials.CustomMetrics.MtlsUrl).To(Equal("Mtls-someURL")) + bindingPolicy = `{ + "instance_max_count":4, + "instance_min_count":1, + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [{ + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }] + }, + "scaling_rules":[ + { + "metric_type":"memoryused", + "threshold":30, + "operator":"<", + "adjustment":"-1" + }] + }` + verifyScheduleIsUpdatedInScheduler(testAppId, bindingPolicy) + }) + It("succeeds with 201", func() { + Expect(resp.Code).To(Equal(http.StatusCreated)) + By("updating the scheduler") + Expect(schedulerServer.ReceivedRequests()).To(HaveLen(1)) + Expect(bindingdb.CreateServiceBindingCallCount()).To(Equal(1)) + verifyCredentialsGenerated(resp) }) }) - // test for credential-type + Context("credential-type is provided while binding", func() { BeforeEach(func() { schedulerExpectedJSON = `{ @@ -1188,7 +1260,6 @@ var _ = Describe("BrokerHandler", func() { }) }) - // Context("When a default policy was provided when creating the service instance", func() { BeforeEach(func() { bindingdb.GetServiceInstanceReturns(&models.ServiceInstance{testInstanceId, testOrgId, testSpaceId, testDefaultPolicy, testDefaultGuid}, nil) @@ -1429,8 +1500,58 @@ var _ = Describe("BrokerHandler", func() { }) }) }) + + Describe("GetBinding", func() { + var ( + err error + bindingPolicy string + ) + + JustBeforeEach(func() { + req, err = http.NewRequest(http.MethodGet, "", nil) + Expect(err).NotTo(HaveOccurred()) + handler.GetBinding(resp, req) + }) + + Context("Binding configurations are exist", func() { + BeforeEach(func() { + bindingPolicy = `{ + "configuration": { + "custom_metrics": { + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_max_count":4, + "instance_min_count":1, + "scaling_rules":[ + { + "metric_type":"memoryused", + "threshold":30, + "operator":"<", + "adjustment":"-1" + }] + }` + Expect(bindingPolicy).NotTo(BeEmpty()) + + }) + It("succeeds with 200", func() { + Expect(resp.Code).To(Equal(http.StatusPreconditionFailed)) + }) + }) + }) }) +func verifyCredentialsGenerated(resp *httptest.ResponseRecorder) { + creds := &models.CredentialResponse{} + responseString := resp.Body.String() + err := json.Unmarshal([]byte(responseString), creds) + Expect(err).NotTo(HaveOccurred()) + Expect(*creds.Credentials.CustomMetrics.URL).To(Equal("someURL")) + Expect(creds.Credentials.CustomMetrics.MtlsUrl).To(Equal("Mtls-someURL")) +} + func createInstanceCreationRequestBody(defaultPolicy string) []byte { m := json.RawMessage(defaultPolicy) instanceCreationReqBody := &models.InstanceCreationRequestBody{ diff --git a/src/autoscaler/api/db/servicebroker.db.changelog.yaml b/src/autoscaler/api/db/servicebroker.db.changelog.yaml index 0a7a80c466..62f3a831e4 100644 --- a/src/autoscaler/api/db/servicebroker.db.changelog.yaml +++ b/src/autoscaler/api/db/servicebroker.db.changelog.yaml @@ -115,3 +115,75 @@ databaseChangeLog: - column: name: default_policy_guid type: varchar(50) + - changeSet: + id: 5 + author: Arsalan + logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.yaml + preConditions: + - onFail: MARK_RAN + not: + - columnExists: + tableName: binding + columnName: custom_metrics_strategy + changes: + - addColumn: + tableName: binding + columns: + - column: + name: custom_metrics_strategy + type: varchar(40) + constraints: + nullable: false + - changeSet: + id: 6 + author: Arsalan + logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.json + preConditions: + - onFail: MARK_RAN + not: + - tableExists: + tableName: metrics_submission + changes: + - createTable: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + type: varchar(40) + constraints: + primaryKey: true + nullable: false + - insert: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + value: 'bound_app' + - insert: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + value: 'same_app' + - changeSet: + id: 7 + author: Arsalan + logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.json + preConditions: + - onFail: MARK_RAN + not: + - foreignKeyConstraintExists: + foreignKeyName: fk_binding_custom_metrics_strategy + changes: + - addForeignKeyConstraint: + baseColumnNames: custom_metrics_strategy + baseTableName: binding + constraintName: fk_binding_custom_metrics_strategy + deferrable: false + initiallyDeferred: false + onDelete: RESTRICT + onUpdate: RESTRICT + referencedColumnNames: custom_metrics_strategy + referencedTableName: metrics_submission + + diff --git a/src/autoscaler/db/db.go b/src/autoscaler/db/db.go index 0b78eac1dd..de20e54781 100644 --- a/src/autoscaler/db/db.go +++ b/src/autoscaler/db/db.go @@ -69,7 +69,7 @@ type BindingDB interface { GetServiceInstanceByAppId(appId string) (*models.ServiceInstance, error) UpdateServiceInstance(ctx context.Context, serviceInstance models.ServiceInstance) error DeleteServiceInstance(ctx context.Context, serviceInstanceId string) error - CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error + CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string, customMetricsStrategy string) error DeleteServiceBinding(ctx context.Context, bindingId string) error DeleteServiceBindingByAppId(ctx context.Context, appId string) error CheckServiceBinding(appId string) bool @@ -78,6 +78,9 @@ type BindingDB interface { CountServiceInstancesInOrg(orgId string) (int, error) GetServiceBinding(ctx context.Context, serviceBindingId string) (*models.ServiceBinding, error) GetBindingIdsByInstanceId(ctx context.Context, instanceId string) ([]string, error) + GetAppBindingByAppId(ctx context.Context, appId string) (string, error) + IsAppBoundToSameAutoscaler(ctx context.Context, appId string, appToScaleId string) (bool, error) + GetCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) } type AppMetricDB interface { diff --git a/src/autoscaler/db/sqldb/binding_sqldb.go b/src/autoscaler/db/sqldb/binding_sqldb.go index 759ace0979..8345a1b8da 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb.go +++ b/src/autoscaler/db/sqldb/binding_sqldb.go @@ -131,19 +131,27 @@ func (bdb *BindingSQLDB) GetServiceInstance(ctx context.Context, serviceInstance } func (bdb *BindingSQLDB) GetServiceInstanceByAppId(appId string) (*models.ServiceInstance, error) { + serviceInstanceId, err := bdb.GetServiceInstanceIdByAppId(appId) + if err != nil { + bdb.logger.Error("get-service-instance-for-app-id", err, lager.Data{"appId": appId}) + return nil, err + } + return bdb.GetServiceInstance(context.Background(), serviceInstanceId) +} + +func (bdb *BindingSQLDB) GetServiceInstanceIdByAppId(appId string) (string, error) { query := bdb.sqldb.Rebind("SELECT service_instance_id FROM binding WHERE app_id = ?") serviceInstanceId := "" err := bdb.sqldb.Get(&serviceInstanceId, query, appId) if err != nil { - bdb.logger.Error("get-service-binding-for-app-id", err, lager.Data{"query": query, "appId": appId}) + bdb.logger.Error("get-service-instance-for-app-id", err, lager.Data{"query": query, "appId": appId}) if errors.Is(err, sql.ErrNoRows) { - return nil, db.ErrDoesNotExist + return serviceInstanceId, db.ErrDoesNotExist } - return nil, err + return serviceInstanceId, err } - - return bdb.GetServiceInstance(context.Background(), serviceInstanceId) + return serviceInstanceId, nil } func (bdb *BindingSQLDB) UpdateServiceInstance(ctx context.Context, serviceInstance models.ServiceInstance) error { @@ -192,11 +200,28 @@ func (bdb *BindingSQLDB) DeleteServiceInstance(ctx context.Context, serviceInsta return db.ErrDoesNotExist } -func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error { +func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string, customMetricsStrategy string) error { + err := bdb.isBindingExists(ctx, bindingId, serviceInstanceId, appId) + if err != nil { + return err + } + query := bdb.sqldb.Rebind("INSERT INTO binding" + + "(binding_id, service_instance_id, app_id, created_at, custom_metrics_strategy) " + + "VALUES(?, ?, ?, ?,?)") + _, err = bdb.sqldb.ExecContext(ctx, query, bindingId, serviceInstanceId, appId, time.Now(), customMetricsStrategy) + + if err != nil { + bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "serviceInstanceId": serviceInstanceId, "bindingId": bindingId, "appId": appId, "customMetricsStrategy": customMetricsStrategy}) + return err + } + return nil +} + +func (bdb *BindingSQLDB) isBindingExists(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error { query := bdb.sqldb.Rebind("SELECT * FROM binding WHERE app_id =?") rows, err := bdb.sqldb.QueryContext(ctx, query, appId) if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) + bdb.logger.Error("is-binding-already-exists", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) return err } @@ -208,19 +233,10 @@ func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId str err = rows.Err() if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) + bdb.logger.Error("is-binding-already-exists", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) return err } - - query = bdb.sqldb.Rebind("INSERT INTO binding" + - "(binding_id, service_instance_id, app_id, created_at) " + - "VALUES(?, ?, ?, ?)") - _, err = bdb.sqldb.ExecContext(ctx, query, bindingId, serviceInstanceId, appId, time.Now()) - - if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId, "bindingid": bindingId, "appid": appId}) - } - return err + return nil } func (bdb *BindingSQLDB) GetServiceBinding(ctx context.Context, serviceBindingId string) (*models.ServiceBinding, error) { @@ -228,7 +244,7 @@ func (bdb *BindingSQLDB) GetServiceBinding(ctx context.Context, serviceBindingId serviceBinding := &models.ServiceBinding{} - query := bdb.sqldb.Rebind("SELECT binding_id, service_instance_id, app_id FROM binding WHERE binding_id =?") + query := bdb.sqldb.Rebind("SELECT binding_id, service_instance_id, app_id, custom_metrics_strategy FROM binding WHERE binding_id =?") err := bdb.sqldb.GetContext(ctx, serviceBinding, query, serviceBindingId) if err != nil { @@ -280,6 +296,18 @@ func (bdb *BindingSQLDB) DeleteServiceBindingByAppId(ctx context.Context, appId } return nil } + +func (bdb *BindingSQLDB) GetAppBindingByAppId(ctx context.Context, appId string) (string, error) { + var bindingId string + query := bdb.sqldb.Rebind("SELECT binding_id FROM binding WHERE app_id =?") + err := bdb.sqldb.QueryRowContext(ctx, query, appId).Scan(&bindingId) + + if err != nil { + bdb.logger.Error("get-service-binding-by-appid", err, lager.Data{"query": query, "appId": appId}) + return "", err + } + return bindingId, nil +} func (bdb *BindingSQLDB) CheckServiceBinding(appId string) bool { var count int query := bdb.sqldb.Rebind("SELECT COUNT(*) FROM binding WHERE app_id=?") @@ -359,3 +387,66 @@ func (bdb *BindingSQLDB) GetBindingIdsByInstanceId(ctx context.Context, instance return bindingIds, rows.Err() } + +func (bdb *BindingSQLDB) IsAppBoundToSameAutoscaler(ctx context.Context, metricSubmitterAppId string, appToScaleId string) (bool, error) { + serviceInstanceId, err := bdb.GetServiceInstanceIdByAppId(metricSubmitterAppId) + if err != nil { + bdb.logger.Error("get-service-instance-by-appId", err, lager.Data{"appId": metricSubmitterAppId}) + return false, err + } + if serviceInstanceId == "" { + bdb.logger.Error("no-service-instance-found-by-appId", err, lager.Data{"appId": metricSubmitterAppId, "serviceInstanceId": serviceInstanceId}) + return false, nil + } + // find all apps which are bound to the same service instance + appIds, err := bdb.GetAppIdsByInstanceId(ctx, serviceInstanceId) + if err != nil { + bdb.logger.Error("get-apps-by-service-instance-id", err, lager.Data{"serviceInstanceId": serviceInstanceId}) + return false, err + } + + if len(appIds) == 0 { + bdb.logger.Error("no-apps-bounded-with-serviceInstance", err, lager.Data{"serviceInstanceId": serviceInstanceId}) + return false, nil + } + // check if the app to scale is in the list of apps bound to the same service instance and return true .otherwise return false + for _, app := range appIds { + if app == appToScaleId { + return true, nil + } + } + return false, nil +} + +func (bdb *BindingSQLDB) GetCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) { + customMetricsStrategy, err := bdb.fetchCustomMetricStrategyByAppId(ctx, appId) + if err != nil { + return "", err + } + return customMetricsStrategy, nil +} + +func (bdb *BindingSQLDB) fetchCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) { + var customMetricsStrategy sql.NullString + query := bdb.sqldb.Rebind("SELECT custom_metrics_strategy FROM binding WHERE app_id =?") + rows, err := bdb.sqldb.QueryContext(ctx, query, appId) + + if err != nil { + bdb.logger.Error("get-custom-metrics-strategy-by-appid", err, lager.Data{"query": query, "appId": appId}) + return "", err + } + defer func() { _ = rows.Close() }() + + if rows.Next() { + if err = rows.Scan(&customMetricsStrategy); err != nil { + bdb.logger.Error("error-finding-customMetricsStrategy-in-binding-table", err) + return "", err + } + } + err = rows.Err() + if err != nil { + bdb.logger.Error("error-finding-customMetricsStrategy-in-binding-table", err) + return "", err + } + return customMetricsStrategy.String, nil +} diff --git a/src/autoscaler/db/sqldb/binding_sqldb_test.go b/src/autoscaler/db/sqldb/binding_sqldb_test.go index 26f6472b1a..e509d0119b 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb_test.go +++ b/src/autoscaler/db/sqldb/binding_sqldb_test.go @@ -61,13 +61,14 @@ var _ = Describe("BindingSqldb", func() { "adjustment":"+1" }] }` - policyGuid2 = addProcessIdTo("test-policy-guid-2") - testInstanceId2 = testInstanceId + "2" - testInstanceId3 = testInstanceId + "3" - testAppId2 = testAppId + "2" - testAppId3 = testAppId + "3" - testBindingId3 = testBindingId + "3" - testBindingId2 = testBindingId + "2" + policyGuid2 = addProcessIdTo("test-policy-guid-2") + testInstanceId2 = testInstanceId + "2" + testInstanceId3 = testInstanceId + "3" + testAppId2 = testAppId + "2" + testAppId3 = testAppId + "3" + testBindingId3 = testBindingId + "3" + testBindingId2 = testBindingId + "2" + customMetricsStrategy = "same_app" ) dbUrl := testhelpers.GetDbUrl() @@ -343,7 +344,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") }) It("should return what was created", func() { expectServiceInstancesToEqual(retrievedServiceInstance, &models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) @@ -352,8 +353,10 @@ var _ = Describe("BindingSqldb", func() { }) Describe("CreateServiceBinding", func() { + JustBeforeEach(func() { - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, customMetricsStrategy) }) Context("When service instance doesn't exist", func() { It("should error", func() { @@ -366,6 +369,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) + customMetricsStrategy = "same_app" }) Context("When service binding is being created first time", func() { @@ -376,12 +380,39 @@ var _ = Describe("BindingSqldb", func() { }) Context("When service binding already exists", func() { It("should error", func() { - err := bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).To(HaveOccurred()) Expect(err).To(Equal(db.ErrAlreadyExists)) }) }) + Context("When service binding is created with custom metrics strategy 'bound_app'", func() { + BeforeEach(func() { + customMetricsStrategy = "bound_app" + }) + It("should succeed", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(hasServiceBindingWithCustomMetricStrategy(testBindingId, testInstanceId, customMetricsStrategy)).To(BeTrue()) + }) + }) + Context("When service binding is created with custom metrics strategy 'same_app'", func() { + BeforeEach(func() { + customMetricsStrategy = "same_app" + }) + It("should succeed", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(hasServiceBindingWithCustomMetricStrategy(testBindingId, testInstanceId, customMetricsStrategy)).To(BeTrue()) + }) + }) + When("service binding is created with invalid custom metrics strategy", func() { + BeforeEach(func() { + customMetricsStrategy = "" + }) + It("should throw an error with foreign key violation", func() { + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("foreign key constraint")) + }) + }) }) }) @@ -402,15 +433,16 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) }) It("should return what was created", func() { Expect(err).NotTo(HaveOccurred()) Expect(retrievedServiceBinding).To(Equal(&models.ServiceBinding{ - ServiceBindingID: testBindingId, - ServiceInstanceID: testInstanceId, - AppID: testAppId, + ServiceBindingID: testBindingId, + ServiceInstanceID: testInstanceId, + AppID: testAppId, + CustomMetricsStrategy: "same_app", })) }) }) @@ -440,7 +472,7 @@ var _ = Describe("BindingSqldb", func() { }) Context("When service binding is present", func() { BeforeEach(func() { - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -456,7 +488,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) err = bdb.DeleteServiceBindingByAppId(context.Background(), testAppId) }) @@ -475,7 +507,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) }) It("should return true", func() { @@ -501,7 +533,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -526,15 +558,15 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "same_app") Expect(err).NotTo(HaveOccurred()) // other unrelated service instance with bindings err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId3, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3) + err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3, "same_app") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -599,17 +631,17 @@ var _ = Describe("BindingSqldb", func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{testInstanceId, testOrgGuid, testSpaceGuid, policyJsonStr, policyGuid}) Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("CreateServiceInstance, failed: testInstanceId %s procId %d", testInstanceId, GinkgoParallelProcess())) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "same_app") Expect(err).NotTo(HaveOccurred()) // other unrelated service instance with bindings err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{testInstanceId3, testOrgGuid, testSpaceGuid, policyJsonStr, policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3) + err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3, "same_app") Expect(err).NotTo(HaveOccurred()) }) @@ -625,6 +657,72 @@ var _ = Describe("BindingSqldb", func() { }) }) }) + + Describe("isAppBoundToSameAutoscaler", func() { + var isTestApp1Bounded bool + JustBeforeEach(func() { + isTestApp1Bounded, _ = bdb.IsAppBoundToSameAutoscaler(context.Background(), testAppId, testAppId2) + Expect(err).NotTo(HaveOccurred()) + }) + When("apps are bounded to same autoscaler instance", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "same_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should return true", func() { + Expect(isTestApp1Bounded).To(BeTrue()) + }) + }) + Context("when neighbouring app is bounded to different autoscaler instance", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId2, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId2, testAppId2, "same_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should return false", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(isTestApp1Bounded).To(BeFalse()) + }) + }) + + }) + + Describe("GetCustomMetricStrategyByAppId", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + }) + Context("When service instance and binding exists with custom metrics strategy 'bound_app'", func() { + BeforeEach(func() { + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "bound_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should get the custom metrics strategy from the database", func() { + customMetricStrategy, _ := bdb.GetCustomMetricStrategyByAppId(context.Background(), testAppId) + Expect(customMetricStrategy).To(Equal("bound_app")) + }) + }) + Context("When service instance and binding exists with custom metrics strategy 'same_app'", func() { + BeforeEach(func() { + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should get the custom metrics strategy from the database", func() { + customMetricStrategy, _ := bdb.GetCustomMetricStrategyByAppId(context.Background(), testAppId) + Expect(customMetricStrategy).To(Equal("same_app")) + }) + }) + + }) }) func addProcessIdTo(id string) string { diff --git a/src/autoscaler/db/sqldb/factories.go b/src/autoscaler/db/sqldb/factories.go index 759d5da2dc..00d195a30c 100644 --- a/src/autoscaler/db/sqldb/factories.go +++ b/src/autoscaler/db/sqldb/factories.go @@ -15,3 +15,12 @@ func CreatePolicyDb(dbConf db.DatabaseConfig, logger lager.Logger) *PolicySQLDB } return policyDB } + +func CreateBindingDB(dbConf db.DatabaseConfig, logger lager.Logger) *BindingSQLDB { + bindingDB, err := NewBindingSQLDB(dbConf, logger.Session("binding-db")) + if err != nil { + logger.Fatal("Failed To connect to bindingDB", err, lager.Data{"dbConfig": dbConf}) + os.Exit(1) + } + return bindingDB +} diff --git a/src/autoscaler/db/sqldb/sqldb_suite_test.go b/src/autoscaler/db/sqldb/sqldb_suite_test.go index 8a5c43f258..929d9fe948 100644 --- a/src/autoscaler/db/sqldb/sqldb_suite_test.go +++ b/src/autoscaler/db/sqldb/sqldb_suite_test.go @@ -117,6 +117,16 @@ func hasServiceBinding(bindingId string, serviceInstanceId string) bool { return item } +func hasServiceBindingWithCustomMetricStrategy(bindingId string, serviceInstanceId string, strategy string) bool { + query := dbHelper.Rebind("SELECT * FROM binding WHERE binding_id = ? AND service_instance_id = ? AND custom_metrics_strategy = ?") + rows, e := dbHelper.Query(query, bindingId, serviceInstanceId, strategy) + FailOnError("can not query table binding", e) + defer func() { _ = rows.Close() }() + item := rows.Next() + FailOnError("can not query table binding", rows.Err()) + return item +} + func cleanPolicyTable() { _, e := dbHelper.Exec("DELETE from policy_json") if e != nil { diff --git a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go index 4b92530cd7..0bea46f93d 100644 --- a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go +++ b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go @@ -59,13 +59,16 @@ func main() { policyDb := sqldb.CreatePolicyDb(conf.Db[db.PolicyDb], logger) defer func() { _ = policyDb.Close() }() + bindingDB := sqldb.CreateBindingDB(conf.Db[db.BindingDb], logger) + defer func() { _ = bindingDB.Close() }() + credentialProvider := cred_helper.CredentialsProvider(conf.CredHelperImpl, conf.StoredProcedureConfig, conf.Db, conf.CacheTTL, conf.CacheCleanupInterval, logger, policyDb) defer func() { _ = credentialProvider.Close() }() httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "metricsforwarder") allowedMetricCache := cache.New(conf.CacheTTL, conf.CacheCleanupInterval) - customMetricsServer := createCustomMetricsServer(conf, logger, policyDb, credentialProvider, allowedMetricCache, httpStatusCollector) + customMetricsServer := createCustomMetricsServer(conf, logger, policyDb, bindingDB, credentialProvider, allowedMetricCache, httpStatusCollector) cacheUpdater := cacheUpdater(logger, mfClock, conf, policyDb, allowedMetricCache) members := grouper.Members{ @@ -97,9 +100,9 @@ func cacheUpdater(logger lager.Logger, mfClock clock.Clock, conf *config.Config, return cacheUpdater } -func createCustomMetricsServer(conf *config.Config, logger lager.Logger, policyDB *sqldb.PolicySQLDB, credentialProvider cred_helper.Credentials, allowedMetricCache *cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector) ifrit.Runner { +func createCustomMetricsServer(conf *config.Config, logger lager.Logger, policyDB *sqldb.PolicySQLDB, bindingDB *sqldb.BindingSQLDB, credentialProvider cred_helper.Credentials, allowedMetricCache *cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector) ifrit.Runner { rateLimiter := ratelimiter.DefaultRateLimiter(conf.RateLimit.MaxAmount, conf.RateLimit.ValidDuration, logger.Session("metricforwarder-ratelimiter")) - httpServer, err := server.NewServer(logger.Session("custom_metrics_server"), conf, policyDB, credentialProvider, *allowedMetricCache, httpStatusCollector, rateLimiter) + httpServer, err := server.NewServer(logger.Session("custom_metrics_server"), conf, policyDB, bindingDB, credentialProvider, *allowedMetricCache, httpStatusCollector, rateLimiter) if err != nil { logger.Fatal("Failed to create client to custom metrics server", err) os.Exit(1) diff --git a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go index fb67b8ce82..3b385b8174 100644 --- a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go +++ b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go @@ -63,53 +63,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { if err != nil { AbortSuite(fmt.Sprintf("DBURL not found: %s", err.Error())) } - - policyDB, err := sqlx.Open(database.DriverName, database.DataSourceName) - Expect(err).NotTo(HaveOccurred()) - - _, err = policyDB.Exec("DELETE from policy_json") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) - } - _, err = policyDB.Exec("DELETE from credentials") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) - } - - policy := ` - { - "instance_min_count": 1, - "instance_max_count": 5, - "scaling_rules":[ - { - "metric_type":"custom", - "breach_duration_secs":600, - "threshold":30, - "operator":"<", - "cool_down_secs":300, - "adjustment":"-1" - } - ] - }` - query := policyDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") - _, err = policyDB.Exec(query, "an-app-id", policy, "1234") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) - } - - encryptedUsername, _ := bcrypt.GenerateFromPassword([]byte(username), 8) - encryptedPassword, _ := bcrypt.GenerateFromPassword([]byte(password), 8) - - query = policyDB.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") - _, err = policyDB.Exec(query, "an-app-id", encryptedUsername, encryptedPassword, "2011-06-18 15:36:38") - if err != nil { - AbortSuite(fmt.Sprintf("Failed to add credentials: %s", err.Error())) - } - - err = policyDB.Close() - if err != nil { - AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) - } + preparePolicyDb(database) + prepareBindingDb(database) return []byte(mf) }, func(pathsByte []byte) { @@ -153,6 +108,12 @@ var _ = SynchronizedBeforeSuite(func() []byte { MaxIdleConnections: 5, ConnectionMaxLifetime: 10 * time.Second, } + cfg.Db[db.BindingDb] = db.DatabaseConfig{ + URL: dbUrl, + MaxOpenConnections: 10, + MaxIdleConnections: 5, + ConnectionMaxLifetime: 10 * time.Second, + } cfg.CredHelperImpl = "default" @@ -162,6 +123,69 @@ var _ = SynchronizedBeforeSuite(func() []byte { healthHttpClient = &http.Client{} }) +func preparePolicyDb(database *db.Database) { + policyDB, err := sqlx.Open(database.DriverName, database.DataSourceName) + Expect(err).NotTo(HaveOccurred()) + + _, err = policyDB.Exec("DELETE from policy_json") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) + } + _, err = policyDB.Exec("DELETE from credentials") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) + } + + policy := ` + { + "instance_min_count": 1, + "instance_max_count": 5, + "scaling_rules":[ + { + "metric_type":"custom", + "breach_duration_secs":600, + "threshold":30, + "operator":"<", + "cool_down_secs":300, + "adjustment":"-1" + } + ] + }` + query := policyDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") + _, err = policyDB.Exec(query, "an-app-id", policy, "1234") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) + } + + encryptedUsername, _ := bcrypt.GenerateFromPassword([]byte(username), 8) + encryptedPassword, _ := bcrypt.GenerateFromPassword([]byte(password), 8) + + query = policyDB.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") + _, err = policyDB.Exec(query, "an-app-id", encryptedUsername, encryptedPassword, "2011-06-18 15:36:38") + if err != nil { + AbortSuite(fmt.Sprintf("Failed to add credentials: %s", err.Error())) + } + + err = policyDB.Close() + if err != nil { + AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) + } +} + +func prepareBindingDb(database *db.Database) { + bindingDB, err := sqlx.Open(database.DriverName, database.DataSourceName) + Expect(err).NotTo(HaveOccurred()) + + _, err = bindingDB.Exec("DELETE from binding") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) + } + err = bindingDB.Close() + if err != nil { + AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) + } +} + var _ = SynchronizedAfterSuite(func() { grpcIngressTestServer.Stop() os.Remove(configFile.Name()) diff --git a/src/autoscaler/metricsforwarder/config/config.go b/src/autoscaler/metricsforwarder/config/config.go index 6536d0b95e..ef46650f6c 100644 --- a/src/autoscaler/metricsforwarder/config/config.go +++ b/src/autoscaler/metricsforwarder/config/config.go @@ -121,7 +121,11 @@ func loadVcapConfig(conf *Config, vcapReader configutil.VCAPConfigurationReader) conf.Db = make(map[string]db.DatabaseConfig) } - if err := configurePolicyDb(conf, vcapReader); err != nil { + if err := configureDb(db.PolicyDb, conf, vcapReader); err != nil { + return err + } + + if err := configureDb(db.BindingDb, conf, vcapReader); err != nil { return err } @@ -146,33 +150,27 @@ func loadMetricsforwarderConfig(conf *Config, vcapReader configutil.VCAPConfigur return yaml.Unmarshal(data, conf) } -func configurePolicyDb(conf *Config, vcapReader configutil.VCAPConfigurationReader) error { - currentPolicyDb, ok := conf.Db[db.PolicyDb] +func configureDb(dbName string, conf *Config, vcapReader configutil.VCAPConfigurationReader) error { + currentDb, ok := conf.Db[dbName] if !ok { - conf.Db[db.PolicyDb] = db.DatabaseConfig{} + conf.Db[dbName] = db.DatabaseConfig{} } - dbURL, err := vcapReader.MaterializeDBFromService(db.PolicyDb) - currentPolicyDb.URL = dbURL + dbURL, err := vcapReader.MaterializeDBFromService(dbName) + currentDb.URL = dbURL if err != nil { return err } - conf.Db[db.PolicyDb] = currentPolicyDb + conf.Db[dbName] = currentDb return nil } func configureStoredProcedureDb(conf *Config, vcapReader configutil.VCAPConfigurationReader) error { - currentStoredProcedureDb, exists := conf.Db[db.StoredProcedureDb] - if !exists { - conf.Db[db.StoredProcedureDb] = db.DatabaseConfig{} - } - - dbURL, err := vcapReader.MaterializeDBFromService(db.StoredProcedureDb) - if err != nil { + if err := configureDb(db.StoredProcedureDb, conf, vcapReader); err != nil { return err } - currentStoredProcedureDb.URL = dbURL + currentStoredProcedureDb := conf.Db[db.StoredProcedureDb] parsedUrl, err := url.Parse(currentStoredProcedureDb.URL) if err != nil { return err @@ -219,18 +217,22 @@ func (c *Config) Validate() error { func (c *Config) validateDbConfig() error { if c.Db[db.PolicyDb].URL == "" { - return errors.New("Policy DB url is empty") + return errors.New("configuration error: Policy DB url is empty") } - return nil + if c.Db[db.BindingDb].URL == "" { + return errors.New("configuration error: Binding DB url is empty") + } + if c.UsingSyslog() { + return c.validateSyslogConfig() + } + return c.validateLoggregatorConfig() } - func (c *Config) validateSyslogOrLoggregator() error { if c.UsingSyslog() { return c.validateSyslogConfig() } return c.validateLoggregatorConfig() } - func (c *Config) validateSyslogConfig() error { if c.SyslogConfig.TLS.CACertFile == "" { return errors.New("SyslogServer Loggregator CACert is empty") diff --git a/src/autoscaler/metricsforwarder/config/config_test.go b/src/autoscaler/metricsforwarder/config/config_test.go index f61b8d9d82..993dafe8fb 100644 --- a/src/autoscaler/metricsforwarder/config/config_test.go +++ b/src/autoscaler/metricsforwarder/config/config_test.go @@ -100,12 +100,27 @@ var _ = Describe("Config", func() { It("loads the db config from VCAP_SERVICES successfully", func() { Expect(err).NotTo(HaveOccurred()) Expect(conf.Db[db.PolicyDb].URL).To(Equal(expectedDbUrl)) - Expect(mockVCAPConfigurationReader.MaterializeDBFromServiceCallCount()).To(Equal(1)) + Expect(mockVCAPConfigurationReader.MaterializeDBFromServiceCallCount()).To(Equal(2)) actualDbName := mockVCAPConfigurationReader.MaterializeDBFromServiceArgsForCall(0) Expect(actualDbName).To(Equal(db.PolicyDb)) }) }) + When("VCAP_SERVICES has relational db service bind to app for policy db", func() { + BeforeEach(func() { + mockVCAPConfigurationReader.GetServiceCredentialContentReturns([]byte(`{ "cred_helper_impl": "default" }`), nil) // #nosec G101 + expectedDbUrl = "postgres://foo:bar@postgres.example.com:5432/policy_db?sslcert=%2Ftmp%2Fclient_cert.sslcert&sslkey=%2Ftmp%2Fclient_key.sslkey&sslrootcert=%2Ftmp%2Fserver_ca.sslrootcert" // #nosec G101 + }) + + It("loads the db config from VCAP_SERVICES successfully", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(conf.Db[db.BindingDb].URL).To(Equal(expectedDbUrl)) + Expect(mockVCAPConfigurationReader.MaterializeDBFromServiceCallCount()).To(Equal(2)) + actualDbName := mockVCAPConfigurationReader.MaterializeDBFromServiceArgsForCall(1) + Expect(actualDbName).To(Equal(db.BindingDb)) + }) + }) + When("storedProcedure_db service is provided and cred_helper_impl is stored_procedure", func() { BeforeEach(func() { mockVCAPConfigurationReader.GetServiceCredentialContentReturns([]byte(`{ "cred_helper_impl": "stored_procedure" }`), nil) // #nosec G101 @@ -117,8 +132,8 @@ var _ = Describe("Config", func() { _, storeProcedureFound := conf.Db[db.StoredProcedureDb] Expect(storeProcedureFound).To(BeTrue()) Expect(conf.Db[db.StoredProcedureDb].URL).To(Equal(expectedDbUrl)) - Expect(mockVCAPConfigurationReader.MaterializeDBFromServiceCallCount()).To(Equal(2)) - actualDbName := mockVCAPConfigurationReader.MaterializeDBFromServiceArgsForCall(1) + Expect(mockVCAPConfigurationReader.MaterializeDBFromServiceCallCount()).To(Equal(3)) + actualDbName := mockVCAPConfigurationReader.MaterializeDBFromServiceArgsForCall(2) Expect(actualDbName).To(Equal(db.StoredProcedureDb)) }) @@ -314,6 +329,12 @@ health: MaxIdleConnections: 5, ConnectionMaxLifetime: 60 * time.Second, } + conf.Db[db.BindingDb] = db.DatabaseConfig{ + URL: "postgres://pqgotest:password@localhost/pqgotest", + MaxOpenConnections: 10, + MaxIdleConnections: 5, + ConnectionMaxLifetime: 60 * time.Second, + } conf.RateLimit.MaxAmount = 10 conf.RateLimit.ValidDuration = 1 * time.Second @@ -385,7 +406,17 @@ health: }) It("should error", func() { - Expect(err).To(MatchError(MatchRegexp("Policy DB url is empty"))) + Expect(err).To(MatchError(MatchRegexp("configuration error: Policy DB url is empty"))) + }) + }) + + When("binding db url is not set", func() { + BeforeEach(func() { + conf.Db[db.BindingDb] = db.DatabaseConfig{URL: ""} + }) + + It("should error", func() { + Expect(err).To(MatchError(MatchRegexp("configuration error: Binding DB url is empty"))) }) }) diff --git a/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go b/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go index e6c49bbbc5..7a706f6f56 100644 --- a/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go +++ b/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go @@ -26,6 +26,7 @@ var ( serverProcess ifrit.Process serverUrl string policyDB *fakes.FakePolicyDB + fakeBindingDB *fakes.FakeBindingDB rateLimiter *fakes.FakeLimiter fakeCredentials *fakes.FakeCredentials @@ -75,13 +76,14 @@ var _ = SynchronizedBeforeSuite(func() []byte { LoggregatorConfig: loggregatorConfig, } policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} credentialCache = *cache.New(10*time.Minute, -1) allowedMetricCache = *cache.New(10*time.Minute, -1) httpStatusCollector := &fakes.FakeHTTPStatusCollector{} rateLimiter = &fakes.FakeLimiter{} fakeCredentials = &fakes.FakeCredentials{} - httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDB, + httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDB, fakeBindingDB, fakeCredentials, allowedMetricCache, httpStatusCollector, rateLimiter) Expect(err).NotTo(HaveOccurred()) serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port) diff --git a/src/autoscaler/metricsforwarder/server/auth/auth_test.go b/src/autoscaler/metricsforwarder/server/auth/auth_test.go index 3f39950be7..43b93b8d5d 100644 --- a/src/autoscaler/metricsforwarder/server/auth/auth_test.go +++ b/src/autoscaler/metricsforwarder/server/auth/auth_test.go @@ -24,22 +24,26 @@ var _ = Describe("Authentication", func() { var ( authTest *auth.Auth fakeCredentials *fakes.FakeCredentials + fakeBindingDB *fakes.FakeBindingDB resp *httptest.ResponseRecorder req *http.Request body []byte vars map[string]string + testAppId string ) BeforeEach(func() { fakeCredentials = &fakes.FakeCredentials{} + fakeBindingDB = &fakes.FakeBindingDB{} vars = make(map[string]string) + testAppId = "an-app-id" resp = httptest.NewRecorder() }) JustBeforeEach(func() { logger := lager.NewLogger("auth-test") var err error - authTest, err = auth.New(logger, fakeCredentials) + authTest, err = auth.New(logger, fakeCredentials, fakeBindingDB) Expect(err).ToNot(HaveOccurred()) }) @@ -47,7 +51,7 @@ var _ = Describe("Authentication", func() { Context("a request to publish custom metrics comes", func() { Context("credentials are valid", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -65,7 +69,7 @@ var _ = Describe("Authentication", func() { Context("credentials are valid but db error occurs", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -83,7 +87,7 @@ var _ = Describe("Authentication", func() { Context("credentials are invalid", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -107,7 +111,7 @@ var _ = Describe("Authentication", func() { const validClientCert1 = "../../../../../test-certs/validmtls_client-1.crt" Context("correct xfcc header with correct CA is supplied for cert 1", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert1)) vars["appid"] = "an-app-id" nextCalled := 0 @@ -125,7 +129,7 @@ var _ = Describe("Authentication", func() { Context("correct xfcc header with correct CA is supplied for cert 2", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) const validClientCert2 = "../../../../../test-certs/validmtls_client-2.crt" req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) vars["appid"] = "an-app-id" @@ -144,7 +148,7 @@ var _ = Describe("Authentication", func() { Context("correct xfcc header including \"'s around the cert", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", fmt.Sprintf("%q", MustReadXFCCcert(validClientCert1))) vars["appid"] = "an-app-id" nextCalled := 0 @@ -162,7 +166,7 @@ var _ = Describe("Authentication", func() { Context("valid cert with wrong app-id is supplied", func() { It("should return status code 403", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert1)) nextCalled := 0 nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -178,6 +182,53 @@ var _ = Describe("Authentication", func() { Expect(nextCalled).To(Equal(0)) }) }) + + Context("Request from neighbour (different) app arrives for app B", func() { + const validClientCert2 = "../../../../../test-certs/validmtls_client-2.crt" + When("custom-metrics-submission-strategy is not set in the scaling policy", func() { + It("It should not call next handler and return with status code 403", func() { + testAppId = "app-to-scale-id" + req = CreateRequest(body, testAppId) + vars["appid"] = testAppId + req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("same_app", nil) + nextCalled := 0 + nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = nextCalled + 1 + }) + + authTest.AuthenticateHandler(nextFunc)(resp, req, vars) + + Expect(policyDB.GetCredentialCallCount()).To(Equal(0)) + Expect(resp.Code).To(Equal(http.StatusForbidden)) + Expect(resp.Body.String()).To(Equal(`{"code":"Forbidden","message":"Unauthorized"}`)) + Expect(nextCalled).To(Equal(0)) + }) + }) + Context("custom-metrics-submission-strategy is set to bound_app in the scaling policy", func() { + It("It should call next handler and return with status code 200", func() { + req = CreateRequest(body, testAppId) + testAppId = "app-to-scale-id" + vars["appid"] = testAppId + req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) + // ToDO: this should be read via configurations aka scaling policy binding parameters + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("bound_app", nil) + nextCalled := 0 + nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = nextCalled + 1 + }) + fakeBindingDB.IsAppBoundToSameAutoscalerReturns(true, nil) + + authTest.AuthenticateHandler(nextFunc)(resp, req, vars) + + Expect(fakeBindingDB.IsAppBoundToSameAutoscalerCallCount()).To(Equal(1)) + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(resp.Body.String()).To(BeEmpty()) + Expect(nextCalled).To(Equal(1)) + }) + }) + + }) }) }) @@ -190,8 +241,8 @@ func MustReadXFCCcert(fileName string) string { return base64.StdEncoding.EncodeToString(block.Bytes) } -func CreateRequest(body []byte) *http.Request { - req, err := http.NewRequest(http.MethodPost, serverUrl+"/v1/apps/an-app-id/metrics", bytes.NewReader(body)) +func CreateRequest(body []byte, appId string) *http.Request { + req, err := http.NewRequest(http.MethodPost, serverUrl+"/v1/apps/"+appId+"/metrics", bytes.NewReader(body)) Expect(err).ToNot(HaveOccurred()) req.Header.Add("Content-Type", "application/json") return req diff --git a/src/autoscaler/metricsforwarder/server/auth/authenticator.go b/src/autoscaler/metricsforwarder/server/auth/authenticator.go index eee5bc7e08..d250b7aa22 100644 --- a/src/autoscaler/metricsforwarder/server/auth/authenticator.go +++ b/src/autoscaler/metricsforwarder/server/auth/authenticator.go @@ -4,6 +4,8 @@ import ( "errors" "net/http" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/cred_helper" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/metricsforwarder/server/common" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" @@ -17,12 +19,14 @@ var ErrorAuthNotFound = errors.New("authentication method not found") type Auth struct { logger lager.Logger credentials cred_helper.Credentials + bindingDB db.BindingDB } -func New(logger lager.Logger, credentials cred_helper.Credentials) (*Auth, error) { +func New(logger lager.Logger, credentials cred_helper.Credentials, bindingDB db.BindingDB) (*Auth, error) { return &Auth{ logger: logger, credentials: credentials, + bindingDB: bindingDB, }, nil } @@ -52,10 +56,10 @@ func (a *Auth) AuthenticateHandler(next http.Handler) func(w http.ResponseWriter func (a *Auth) CheckAuth(r *http.Request, appID string) error { var errAuth error - errAuth = a.XFCCAuth(r, appID) + errAuth = a.XFCCAuth(r, a.bindingDB, appID) if errAuth != nil { if errors.Is(errAuth, ErrXFCCHeaderNotFound) { - a.logger.Info("Trying basic auth") + a.logger.Info("Trying basic auth", lager.Data{"app_id": appID}) errAuth = a.BasicAuth(r, appID) } } diff --git a/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go b/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go new file mode 100644 index 0000000000..a4df128c04 --- /dev/null +++ b/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go @@ -0,0 +1,46 @@ +package auth + +import ( + "net/http" + + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/lager/v3" +) + +type MetricsSubmissionStrategy interface { + validate(appId string, submitterAppIdFromCert string, logger lager.Logger, bindingDB db.BindingDB, r *http.Request) error +} + +var _ MetricsSubmissionStrategy = &DefaultMetricsSubmissionStrategy{} + +type DefaultMetricsSubmissionStrategy struct{} + +func (d *DefaultMetricsSubmissionStrategy) validate(appId string, submitterAppIdFromCert string, _ lager.Logger, _ db.BindingDB, _ *http.Request) error { + // check if appID is same as AppIdFromCert + if appId != submitterAppIdFromCert { + return ErrorAppIDWrong + } + return nil +} + +type BoundedMetricsSubmissionStrategy struct{} + +func (c *BoundedMetricsSubmissionStrategy) validate(appToScaleID string, submitterAppIdFromCert string, logger lager.Logger, bindingDB db.BindingDB, r *http.Request) error { + if appToScaleID != submitterAppIdFromCert { + return c.verifyMetricSubmissionStrategy(r, logger, bindingDB, submitterAppIdFromCert, appToScaleID) + } + return nil +} + +func (c *BoundedMetricsSubmissionStrategy) verifyMetricSubmissionStrategy(r *http.Request, logger lager.Logger, bindingDB db.BindingDB, submitterAppIDFromCert string, appToScaleID string) error { + isAppBound, err := bindingDB.IsAppBoundToSameAutoscaler(r.Context(), submitterAppIDFromCert, appToScaleID) + if err != nil { + logger.Error("error-checking-app-bound-to-same-service", err, lager.Data{"metric-submitter-app-id": submitterAppIDFromCert}) + return err + } + if !isAppBound { + logger.Info("app-not-bound-to-same-service", lager.Data{"app-id": submitterAppIDFromCert}) + return ErrorAppNotBound + } + return nil +} diff --git a/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go b/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go index a91980cbe8..994d1151f9 100644 --- a/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go +++ b/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go @@ -7,13 +7,18 @@ import ( "fmt" "net/http" "strings" + + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" + "code.cloudfoundry.org/lager/v3" ) var ErrXFCCHeaderNotFound = errors.New("mTLS authentication method not found") var ErrorNoAppIDFound = errors.New("certificate does not contain an app id") -var ErrorAppIDWrong = errors.New("app id in certificate is not valid") +var ErrorAppIDWrong = errors.New("app is not allowed to send metrics due to invalid app id in certificate") +var ErrorAppNotBound = errors.New("application is not bound to the same service instance") -func (a *Auth) XFCCAuth(r *http.Request, appID string) error { +func (a *Auth) XFCCAuth(r *http.Request, bindingDB db.BindingDB, appToScaleID string) error { xfccHeader := r.Header.Get("X-Forwarded-Client-Cert") if xfccHeader == "" { return ErrXFCCHeaderNotFound @@ -29,20 +34,37 @@ func (a *Auth) XFCCAuth(r *http.Request, appID string) error { return fmt.Errorf("failed to parse certificate: %w", err) } - certAppId := getAppId(cert) + submitterAppIDFromCert := readAppIdFromCert(cert) - if len(certAppId) == 0 { + if len(submitterAppIDFromCert) == 0 { return ErrorNoAppIDFound } - if appID != certAppId { - return ErrorAppIDWrong + // Case: Submitting app is not the same as the app to scale + if appToScaleID != submitterAppIDFromCert { + var metricSubmissionStrategy MetricsSubmissionStrategy + customMetricSubmissionStrategy, err := bindingDB.GetCustomMetricStrategyByAppId(r.Context(), appToScaleID) + if err != nil { + a.logger.Error("failed-to-get-custom-metric-strategy", err, lager.Data{"appToScaleID": appToScaleID}) + return err + } + a.logger.Info("custom-metrics-submission-strategy", lager.Data{"appToScaleID": appToScaleID, "submitterAppIDFromCert": submitterAppIDFromCert, "strategy": customMetricSubmissionStrategy}) + + if customMetricSubmissionStrategy == models.CustomMetricsBoundApp { + metricSubmissionStrategy = &BoundedMetricsSubmissionStrategy{} + } else { + metricSubmissionStrategy = &DefaultMetricsSubmissionStrategy{} + } + err = metricSubmissionStrategy.validate(appToScaleID, submitterAppIDFromCert, a.logger, bindingDB, r) + if err != nil { + return err + } } return nil } -func getAppId(cert *x509.Certificate) string { +func readAppIdFromCert(cert *x509.Certificate) string { var certAppId string for _, ou := range cert.Subject.OrganizationalUnit { if strings.Contains(ou, "app:") { diff --git a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go index e3807c7a28..bd32de39ad 100644 --- a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go +++ b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go @@ -20,23 +20,25 @@ import ( ) var ( - ErrorReadingBody = errors.New("error reading custom metrics request body") - ErrorUnmarshalingBody = errors.New("error unmarshaling custom metrics request body") - ErrorParsingBody = errors.New("error parsing request body") + ErrorReadingBody = errors.New("error reading custom metrics request body") + ErrorUnmarshallingBody = errors.New("error unmarshalling custom metrics request body") + ErrorParsingBody = errors.New("error parsing request body") ) type CustomMetricsHandler struct { metricForwarder forwarder.MetricForwarder policyDB db.PolicyDB + bindingDB db.BindingDB allowedMetricCache cache.Cache cacheTTL time.Duration logger lager.Logger } -func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, allowedMetricCache cache.Cache) *CustomMetricsHandler { +func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, bindingDB db.BindingDB, allowedMetricCache cache.Cache) *CustomMetricsHandler { return &CustomMetricsHandler{ metricForwarder: metricForwarder, policyDB: policyDB, + bindingDB: bindingDB, allowedMetricCache: allowedMetricCache, logger: logger, } @@ -51,7 +53,7 @@ func (mh *CustomMetricsHandler) VerifyCredentialsAndPublishMetrics(w http.Respon handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ Code: "Internal-Server-Error", Message: "error reading custom metrics request body"}) - } else if errors.Is(err, ErrorUnmarshalingBody) { + } else if errors.Is(err, ErrorUnmarshallingBody) { handlers.WriteJSONResponse(w, http.StatusBadRequest, models.ErrorResponse{ Code: "Bad-Request", Message: "Error unmarshaling custom metrics request body"}) @@ -97,12 +99,12 @@ func (mh *CustomMetricsHandler) PublishMetrics(w http.ResponseWriter, r *http.Re var metricsConsumer *models.MetricsConsumer err = json.Unmarshal(body, &metricsConsumer) if err != nil { - mh.logger.Error("error-unmarshaling-metrics", err, lager.Data{"body": r.Body}) - return ErrorUnmarshalingBody + mh.logger.Error("error-unmarshalling-metrics", err, lager.Data{"body": r.Body}) + return ErrorUnmarshallingBody } err = mh.validateCustomMetricTypes(appID, metricsConsumer) if err != nil { - mh.logger.Error("failed-validating-metrictypes", err, lager.Data{"metrics": metricsConsumer}) + mh.logger.Error("failed-validating-metric-types", err, lager.Data{"metrics": metricsConsumer}) return fmt.Errorf("metric validation Failed %w", err) } @@ -138,7 +140,17 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric // AllowedMetrics found in cache allowedMetricTypeSet = res.(map[string]struct{}) } else { - // AllowedMetrics not found in cache, find AllowedMetrics from Database + // allow app with strategy as bound_app to submit metrics without policy + isAppWithBoundStrategy, err := mh.isAppWithBoundStrategy(appGUID) + if err != nil { + mh.logger.Error("error-finding-app-submission-strategy", err, lager.Data{"appId": appGUID}) + return err + } + if isAppWithBoundStrategy { + mh.logger.Info("app-with-bound-strategy-found", lager.Data{"appId": appGUID}) + return nil + } + scalingPolicy, err := mh.policyDB.GetAppPolicy(context.TODO(), appGUID) if err != nil { mh.logger.Error("error-getting-policy", err, lager.Data{"appId": appGUID}) @@ -172,6 +184,20 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric return nil } +func (mh *CustomMetricsHandler) isAppWithBoundStrategy(appGUID string) (bool, error) { + // allow app with submission_strategy as bound_app to submit custom metrics even without policy + submissionStrategy, err := mh.bindingDB.GetCustomMetricStrategyByAppId(context.TODO(), appGUID) + if err != nil { + mh.logger.Error("error-getting-custom-metrics-strategy", err, lager.Data{"appId": appGUID}) + return false, err + } + if submissionStrategy == "bound_app" { + mh.logger.Info("bounded-metrics-submission-strategy", lager.Data{"appId": appGUID, "submission_strategy": submissionStrategy}) + return true, nil + } + return false, nil +} + func (mh *CustomMetricsHandler) getMetrics(appID string, metricsConsumer *models.MetricsConsumer) []*models.CustomMetric { var metrics []*models.CustomMetric for _, metric := range metricsConsumer.CustomMetrics { diff --git a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go index 9eea0d12ef..31bf3d8e94 100644 --- a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go +++ b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go @@ -29,6 +29,7 @@ var _ = Describe("MetricHandler", func() { allowedMetricTypeSet map[string]struct{} policyDB *fakes.FakePolicyDB + fakeBindingDB *fakes.FakeBindingDB metricsforwarder *fakes.FakeMetricForwarder resp *httptest.ResponseRecorder @@ -46,12 +47,13 @@ var _ = Describe("MetricHandler", func() { BeforeEach(func() { logger := lager.NewLogger("metrichandler-test") policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} metricsforwarder = &fakes.FakeMetricForwarder{} allowedMetricCache = *cache.New(10*time.Minute, -1) allowedMetricTypeSet = make(map[string]struct{}) vars = make(map[string]string) resp = httptest.NewRecorder() - handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, allowedMetricCache) + handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, fakeBindingDB, allowedMetricCache) allowedMetricCache.Flush() }) @@ -71,7 +73,7 @@ var _ = Describe("MetricHandler", func() { }, nil) body = []byte(`{ "instance_index":0, - "test" : + "test" : "metrics":[ { "name":"custom_metric1", @@ -265,6 +267,55 @@ var _ = Describe("MetricHandler", func() { }) }) + + Context("when a valid request to publish custom metrics comes from a neighbour App", func() { + When("neighbour app is bound to same autoscaler instance with policy", func() { + BeforeEach(func() { + scalingPolicy = &models.ScalingPolicy{ + InstanceMin: 1, + InstanceMax: 6, + ScalingRules: []*models.ScalingRule{{ + MetricType: "queuelength", + BreachDurationSeconds: 60, + Threshold: 10, + Operator: ">", + CoolDownSeconds: 60, + Adjustment: "+1"}}} + policyDB.GetAppPolicyReturns(scalingPolicy, nil) + customMetrics := []*models.CustomMetric{ + { + Name: "queuelength", Value: 12, Unit: "unit", InstanceIndex: 1, AppGUID: "an-app-id", + }, + } + body, err = json.Marshal(models.MetricsConsumer{InstanceIndex: 0, CustomMetrics: customMetrics}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should returns status code 200 and policy exists", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(policyDB.GetAppPolicyCallCount()).To(Equal(1)) + + }) + }) + When("neighbour app is bound to same autoscaler instance without policy", func() { + BeforeEach(func() { + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("bound_app", nil) + customMetrics := []*models.CustomMetric{ + { + Name: "queuelength", Value: 12, Unit: "unit", InstanceIndex: 1, AppGUID: "an-app-id", + }, + } + body, err = json.Marshal(models.MetricsConsumer{InstanceIndex: 0, CustomMetrics: customMetrics}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should returns status code 200", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(fakeBindingDB.GetCustomMetricStrategyByAppIdCallCount()).To(Equal(1)) + + }) + }) + }) }) }) diff --git a/src/autoscaler/metricsforwarder/server/server.go b/src/autoscaler/metricsforwarder/server/server.go index 582ee37b0e..f98f6798cf 100644 --- a/src/autoscaler/metricsforwarder/server/server.go +++ b/src/autoscaler/metricsforwarder/server/server.go @@ -23,14 +23,14 @@ import ( "github.com/tedsuo/ifrit" ) -func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, credentials cred_helper.Credentials, allowedMetricCache cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector, rateLimiter ratelimiter.Limiter) (ifrit.Runner, error) { +func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, bindingDB db.BindingDB, credentials cred_helper.Credentials, allowedMetricCache cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector, rateLimiter ratelimiter.Limiter) (ifrit.Runner, error) { metricForwarder, err := createMetricForwarder(logger, conf) if err != nil { return nil, fmt.Errorf("failed to create metric forwarder: %w", err) } - mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, allowedMetricCache) - authenticator, err := auth.New(logger, credentials) + mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, bindingDB, allowedMetricCache) + authenticator, err := auth.New(logger, credentials, bindingDB) if err != nil { return nil, fmt.Errorf("failed to create auth middleware: %w", err) } diff --git a/src/autoscaler/metricsforwarder/server/server_suite_test.go b/src/autoscaler/metricsforwarder/server/server_suite_test.go index 23dbdbcf31..f620989f49 100644 --- a/src/autoscaler/metricsforwarder/server/server_suite_test.go +++ b/src/autoscaler/metricsforwarder/server/server_suite_test.go @@ -24,10 +24,13 @@ import ( ) var ( - conf *config.Config - serverProcess ifrit.Process - serverUrl string - policyDB *fakes.FakePolicyDB + conf *config.Config + serverProcess ifrit.Process + serverUrl string + policyDB *fakes.FakePolicyDB + + fakeBindingDB *fakes.FakeBindingDB + rateLimiter *fakes.FakeLimiter fakeCredentials *fakes.FakeCredentials @@ -82,6 +85,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { Health: healthConfig, } policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} + allowedMetricCache = *cache.New(10*time.Minute, -1) httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "metricsforwarder") @@ -91,7 +96,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { logger := lager.NewLogger("server_suite_test") logger.RegisterSink(lager.NewWriterSink(GinkgoWriter, lager.DEBUG)) - httpServer, err := NewServer(logger, conf, policyDB, + httpServer, err := NewServer(logger, conf, policyDB, fakeBindingDB, fakeCredentials, allowedMetricCache, httpStatusCollector, rateLimiter) Expect(err).NotTo(HaveOccurred()) serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port) diff --git a/src/autoscaler/metricsforwarder/server/server_test.go b/src/autoscaler/metricsforwarder/server/server_test.go index 8c3d121249..fb7698ed95 100644 --- a/src/autoscaler/metricsforwarder/server/server_test.go +++ b/src/autoscaler/metricsforwarder/server/server_test.go @@ -44,7 +44,7 @@ func setupRequest(method, url, authHeader string, body []byte) (*http.Client, *h return client, req, nil } -var _ = Describe("CustomMetrics Server", func() { +var _ = Describe("CustomMetricsConfig Server", func() { var ( resp *http.Response req *http.Request diff --git a/src/autoscaler/models/api.go b/src/autoscaler/models/api.go index 9d9d87153b..8f8664e15b 100644 --- a/src/autoscaler/models/api.go +++ b/src/autoscaler/models/api.go @@ -49,9 +49,15 @@ type ServiceInstance struct { } type ServiceBinding struct { - ServiceBindingID string `db:"binding_id"` - ServiceInstanceID string `db:"service_instance_id"` - AppID string `db:"app_id"` + ServiceBindingID string `db:"binding_id"` + ServiceInstanceID string `db:"service_instance_id"` + AppID string `db:"app_id"` + CustomMetricsStrategy string `db:"custom_metrics_strategy"` +} + +type BindingConfigWithScaling struct { + BindingConfig + ScalingPolicy } type BindingRequestBody struct { diff --git a/src/autoscaler/models/binding_configs.go b/src/autoscaler/models/binding_configs.go new file mode 100644 index 0000000000..1efc50a693 --- /dev/null +++ b/src/autoscaler/models/binding_configs.go @@ -0,0 +1,44 @@ +package models + +// BindingConfig +/* The configuration object received as part of the binding parameters. Example config: +{ + "configuration": { + "custom_metrics": { + "auth": { + "credential_type": "binding_secret" + }, + "metric_submission_strategy": { + "allow_from": "bound_app or same_app" + } + } + } +*/ + +const ( + CustomMetricsBoundApp = "bound_app" + CustomMetricsSameApp = "same_app" +) + +type BindingConfig struct { + Configuration Configuration `json:"configuration"` +} +type Configuration struct { + CustomMetrics CustomMetricsConfig `json:"custom_metrics"` +} + +type CustomMetricsConfig struct { + MetricSubmissionStrategy MetricsSubmissionStrategy `json:"metric_submission_strategy"` +} + +type MetricsSubmissionStrategy struct { + AllowFrom string `json:"allow_from"` +} + +func (b *BindingConfig) GetCustomMetricsStrategy() string { + return b.Configuration.CustomMetrics.MetricSubmissionStrategy.AllowFrom +} + +func (b *BindingConfig) SetCustomMetricsStrategy(allowFrom string) { + b.Configuration.CustomMetrics.MetricSubmissionStrategy.AllowFrom = allowFrom +} diff --git a/src/autoscaler/mta.tpl.yaml b/src/autoscaler/mta.tpl.yaml index 7f61c7dac9..a57411852d 100644 --- a/src/autoscaler/mta.tpl.yaml +++ b/src/autoscaler/mta.tpl.yaml @@ -41,6 +41,7 @@ resources: parameters: service-tags: - policy_db + - binding_db - relational - name: syslog-client type: org.cloudfoundry.user-provided-service diff --git a/templates/app-autoscaler.yml b/templates/app-autoscaler.yml index fc72485f0f..66b62f37c5 100644 --- a/templates/app-autoscaler.yml +++ b/templates/app-autoscaler.yml @@ -479,6 +479,7 @@ instance_groups: properties: autoscaler: policy_db: *database + binding_db: *database policy_db_connection_config: *databaseConnectionConfig metricsforwarder: health: