Skip to content

Commit

Permalink
Add an integration test that runs iris pipeline to completion
Browse files Browse the repository at this point in the history
Fix pre-commit issue

Updated test

Update test

Paremeterize Pipeline display name

Update nitpick suggestions

Update test
  • Loading branch information
VaniHaripriya committed Apr 24, 2024
1 parent 4dda89c commit 93ce844
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
57 changes: 57 additions & 0 deletions tests/pipeline_runs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//go:build test_integration

/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"bytes"
"fmt"
"io"
"net/http"
"testing"

TestUtil "github.com/opendatahub-io/data-science-pipelines-operator/tests/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func (suite *IntegrationTestSuite) TestPipelineSuccessfulRun() {

suite.T().Run("Should create a Pipeline Run", func(t *testing.T) {
// Retrieve Pipeline ID to create a new run
pipelineDisplayName := "[Demo] iris-training"
pipelineID := TestUtil.RetrievePipelineId(t, APIServerURL, pipelineDisplayName)
postUrl := fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL)
body := TestUtil.FormatRequestBody(t, pipelineID, pipelineDisplayName)
contentType := "application/json"
// Create a new run
response, err := http.Post(postUrl, contentType, bytes.NewReader(body))
require.NoError(t, err)
responseData, err := io.ReadAll(response.Body)
responseString := string(responseData)
loggr.Info(responseString)
require.NoError(t, err)
assert.Equal(t, 200, response.StatusCode)
})

suite.T().Run("Should successfully complete the Pipeleine Run", func(t *testing.T) {
err := TestUtil.WaitForPipelineRunCompletion(t, APIServerURL)
require.NoError(t, err)
})

}
89 changes: 89 additions & 0 deletions tests/util/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,32 @@ package testUtil

import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type PipelineRequest struct {
DisplayName string `json:"display_name"`
PipelineVersionReference struct {
PipelineID string `json:"pipeline_id"`
} `json:"pipeline_version_reference"`
}
type Pipeline struct {
Pipelines []struct {
PipelineID string `json:"pipeline_id"`
DisplayName string `json:"display_name"`
} `json:"pipelines"`
}

// FormFromFile creates a multipart form data from the provided form map where the values are paths to files.
// It returns a buffer containing the encoded form data and the content type of the form.
// Requires passing the testing.T object for error handling with Testify.
Expand Down Expand Up @@ -52,3 +69,75 @@ func FormFromFile(t *testing.T, form map[string]string) (*bytes.Buffer, string)

return body, mp.FormDataContentType()
}

func RetrievePipelineId(t *testing.T, APIServerURL string, PipelineDisplayName string) string {
response, err := http.Get(fmt.Sprintf("%s/apis/v2beta1/pipelines", APIServerURL))
require.NoError(t, err)
responseData, err := io.ReadAll(response.Body)
require.NoError(t, err)
var pipelineData Pipeline
var pipelineID string
err = json.Unmarshal(responseData, &pipelineData)
require.NoError(t, err)
for _, pipeline := range pipelineData.Pipelines {
if pipeline.DisplayName == PipelineDisplayName {
pipelineID = pipeline.PipelineID
break
}
}
return pipelineID
}

func FormatRequestBody(t *testing.T, pipelineID string, PipelineDisplayName string) []byte {
requestBody := PipelineRequest{
DisplayName: PipelineDisplayName,
PipelineVersionReference: struct {
PipelineID string `json:"pipeline_id"`
}{PipelineID: pipelineID},
}

body, err := json.Marshal(requestBody)
require.NoError(t, err)
return body
}

func WaitForPipelineRunCompletion(t *testing.T, APIServerURL string) error {
timeout := time.After(6 * time.Minute)
ticker := time.NewTicker(6 * time.Second)
defer ticker.Stop()
for {
select {
case <-timeout:
return fmt.Errorf("timed out waiting for pipeline run completion")
case <-ticker.C:
// Check the status of the pipeline run
status, err := CheckPipelineRunStatus(t, APIServerURL)
require.NoError(t, err)
switch status {
case "SUCCEEDED":
return nil
case "SKIPPED", "FAILED", "CANCELING", "CANCELED", "PAUSED":
return fmt.Errorf("pipeline run status: %s", status)
}
}
}
}

func CheckPipelineRunStatus(t *testing.T, APIServerURL string) (string, error) {
response, err := http.Get(fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL))
require.NoError(t, err)
responseData, err := io.ReadAll(response.Body)
require.NoError(t, err)
var data map[string]interface{}
var state string
err = json.Unmarshal(responseData, &data)
require.NoError(t, err)

// Extracting the Run state
runs := data["runs"].([]interface{})
for _, run := range runs {
runData := run.(map[string]interface{})
state = runData["state"].(string)
}
return state, nil
}

0 comments on commit 93ce844

Please sign in to comment.