From 93ce844aa2d06e61134b1c1be4f630f93912dec1 Mon Sep 17 00:00:00 2001 From: vmudadla Date: Tue, 2 Apr 2024 12:47:16 -0500 Subject: [PATCH] Add an integration test that runs iris pipeline to completion Fix pre-commit issue Updated test Update test Paremeterize Pipeline display name Update nitpick suggestions Update test --- tests/pipeline_runs_test.go | 57 ++++++++++++++++++++++++ tests/util/rest.go | 89 +++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 tests/pipeline_runs_test.go diff --git a/tests/pipeline_runs_test.go b/tests/pipeline_runs_test.go new file mode 100644 index 00000000..94aa92c7 --- /dev/null +++ b/tests/pipeline_runs_test.go @@ -0,0 +1,57 @@ +//go:build test_integration + +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "bytes" + "fmt" + "io" + "net/http" + "testing" + + TestUtil "github.com/opendatahub-io/data-science-pipelines-operator/tests/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func (suite *IntegrationTestSuite) TestPipelineSuccessfulRun() { + + suite.T().Run("Should create a Pipeline Run", func(t *testing.T) { + // Retrieve Pipeline ID to create a new run + pipelineDisplayName := "[Demo] iris-training" + pipelineID := TestUtil.RetrievePipelineId(t, APIServerURL, pipelineDisplayName) + postUrl := fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL) + body := TestUtil.FormatRequestBody(t, pipelineID, pipelineDisplayName) + contentType := "application/json" + // Create a new run + response, err := http.Post(postUrl, contentType, bytes.NewReader(body)) + require.NoError(t, err) + responseData, err := io.ReadAll(response.Body) + responseString := string(responseData) + loggr.Info(responseString) + require.NoError(t, err) + assert.Equal(t, 200, response.StatusCode) + }) + + suite.T().Run("Should successfully complete the Pipeleine Run", func(t *testing.T) { + err := TestUtil.WaitForPipelineRunCompletion(t, APIServerURL) + require.NoError(t, err) + }) + +} diff --git a/tests/util/rest.go b/tests/util/rest.go index 0da3e847..c71d1842 100644 --- a/tests/util/rest.go +++ b/tests/util/rest.go @@ -15,15 +15,32 @@ package testUtil import ( "bytes" + "encoding/json" + "fmt" "io" "mime/multipart" + "net/http" "os" "strings" "testing" + "time" "github.com/stretchr/testify/require" ) +type PipelineRequest struct { + DisplayName string `json:"display_name"` + PipelineVersionReference struct { + PipelineID string `json:"pipeline_id"` + } `json:"pipeline_version_reference"` +} +type Pipeline struct { + Pipelines []struct { + PipelineID string `json:"pipeline_id"` + DisplayName string `json:"display_name"` + } `json:"pipelines"` +} + // FormFromFile creates a multipart form data from the provided form map where the values are paths to files. // It returns a buffer containing the encoded form data and the content type of the form. // Requires passing the testing.T object for error handling with Testify. @@ -52,3 +69,75 @@ func FormFromFile(t *testing.T, form map[string]string) (*bytes.Buffer, string) return body, mp.FormDataContentType() } + +func RetrievePipelineId(t *testing.T, APIServerURL string, PipelineDisplayName string) string { + response, err := http.Get(fmt.Sprintf("%s/apis/v2beta1/pipelines", APIServerURL)) + require.NoError(t, err) + responseData, err := io.ReadAll(response.Body) + require.NoError(t, err) + var pipelineData Pipeline + var pipelineID string + err = json.Unmarshal(responseData, &pipelineData) + require.NoError(t, err) + for _, pipeline := range pipelineData.Pipelines { + if pipeline.DisplayName == PipelineDisplayName { + pipelineID = pipeline.PipelineID + break + } + } + return pipelineID +} + +func FormatRequestBody(t *testing.T, pipelineID string, PipelineDisplayName string) []byte { + requestBody := PipelineRequest{ + DisplayName: PipelineDisplayName, + PipelineVersionReference: struct { + PipelineID string `json:"pipeline_id"` + }{PipelineID: pipelineID}, + } + + body, err := json.Marshal(requestBody) + require.NoError(t, err) + return body +} + +func WaitForPipelineRunCompletion(t *testing.T, APIServerURL string) error { + timeout := time.After(6 * time.Minute) + ticker := time.NewTicker(6 * time.Second) + defer ticker.Stop() + for { + select { + case <-timeout: + return fmt.Errorf("timed out waiting for pipeline run completion") + case <-ticker.C: + // Check the status of the pipeline run + status, err := CheckPipelineRunStatus(t, APIServerURL) + require.NoError(t, err) + switch status { + case "SUCCEEDED": + return nil + case "SKIPPED", "FAILED", "CANCELING", "CANCELED", "PAUSED": + return fmt.Errorf("pipeline run status: %s", status) + } + } + } +} + +func CheckPipelineRunStatus(t *testing.T, APIServerURL string) (string, error) { + response, err := http.Get(fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL)) + require.NoError(t, err) + responseData, err := io.ReadAll(response.Body) + require.NoError(t, err) + var data map[string]interface{} + var state string + err = json.Unmarshal(responseData, &data) + require.NoError(t, err) + + // Extracting the Run state + runs := data["runs"].([]interface{}) + for _, run := range runs { + runData := run.(map[string]interface{}) + state = runData["state"].(string) + } + return state, nil +}