Skip to content

Commit

Permalink
feat: tests for new native client and changes in other places to make…
Browse files Browse the repository at this point in the history
… it work with new implementation
  • Loading branch information
renzodavid9 committed Sep 10, 2024
1 parent 6e17b2d commit 35d83ff
Show file tree
Hide file tree
Showing 6 changed files with 411 additions and 92 deletions.
12 changes: 9 additions & 3 deletions pkg/skaffold/deploy/kubectl/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"testing"
"time"

Expand All @@ -40,6 +39,11 @@ import (
"github.com/GoogleContainerTools/skaffold/v2/testutil"
)

type gcsClientMock struct{}

func (g gcsClientMock) DownloadRecursive(ctx context.Context, src, dst string) error {
return nil
}
func TestKubectlV1RenderDeploy(t *testing.T) {
tests := []struct {
description string
Expand Down Expand Up @@ -520,13 +524,15 @@ func TestGCSManifests(t *testing.T) {
RawK8s: []string{"gs://dev/deployment.yaml"},
},
commands: testutil.
CmdRunOut(fmt.Sprintf("gsutil cp -r %s %s", "gs://dev/deployment.yaml", filepath.Join(manifest.ManifestTmpDir, manifest.ManifestsFromGCS)), "log").
AndRun("kubectl --context kubecontext --namespace testNamespace apply -f -"),
CmdRun("kubectl --context kubecontext --namespace testNamespace apply -f -"),
}}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&client.Client, deployutil.MockK8sClient)
t.Override(&util.DefaultExecCommand, test.commands)
t.Override(&manifest.GetGCSClient, func() manifest.GCSClient {
return gcsClientMock{}
})
if err := os.MkdirAll(manifest.ManifestTmpDir, os.ModePerm); err != nil {
t.Fatal(err)
}
Expand Down
198 changes: 118 additions & 80 deletions pkg/skaffold/gcs/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,21 @@ import (
"google.golang.org/api/iterator"
)

// Obj contains information about the GCS object.
var GetBucketManager = getBucketManager

// bucketHandler defines the available interactaions with a GCS bucket.
type bucketHandler interface {
// ListObjects list the objects that match the given query.
ListObjects(ctx context.Context, q *storage.Query) ([]string, error)
// DownloadObject download the object with the given uri in the localPath.
DownloadObject(ctx context.Context, localPath, uri string) error
// UploadObject creates a files with the given content with the objName.
UploadObject(ctx context.Context, objName string, content *os.File) error
// Close closes the bucket handler connection.
Close()
}

// uriInfo contains information about the GCS object.
type uriInfo struct {
// Bucket is the name of the GCS bucket.
Bucket string
Expand All @@ -48,17 +62,16 @@ type Native struct{}

// Downloads the content that match the given src uri and subfolders.
func (n *Native) DownloadRecursive(ctx context.Context, src, dst string) error {
sc, err := storage.NewClient(ctx)
uriInfo, err := n.parseGCSURI(src)
if err != nil {
return fmt.Errorf("error creating GCS Client: %w", err)
return err
}
defer sc.Close()

uriInfo, err := n.parseGCSURI(src)
bucket, err := GetBucketManager(ctx, uriInfo.Bucket)
if err != nil {
return err
}
bucket := sc.Bucket(uriInfo.Bucket)
defer bucket.Close()

files, err := n.filesToDownload(ctx, bucket, uriInfo)
if err != nil {
Expand All @@ -67,7 +80,15 @@ func (n *Native) DownloadRecursive(ctx context.Context, src, dst string) error {

for uri, localPath := range files {
fullPath := filepath.Join(dst, localPath)
if err := n.downloadFile(ctx, bucket, fullPath, uri); err != nil {

dir := filepath.Dir(fullPath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create directory: %v", err)
}
}

if err := bucket.DownloadObject(ctx, fullPath, uri); err != nil {
return err
}
}
Expand All @@ -77,45 +98,36 @@ func (n *Native) DownloadRecursive(ctx context.Context, src, dst string) error {

// Uploads a single file to the given dst.
func (n *Native) UploadFile(ctx context.Context, src, dst string) error {
sc, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("error creating GCS Client: %w", err)
}
defer sc.Close()

f, err := os.Open(src)
if err != nil {
return fmt.Errorf("error opening file: %w", err)
}
defer f.Close()

uinfo, err := n.parseGCSURI(dst)
urinfo, err := n.parseGCSURI(dst)
if err != nil {
return err
}
bucket := sc.Bucket(uinfo.Bucket)

isDirectory, err := n.isGCSDirectory(ctx, bucket, uinfo)
bucket, err := GetBucketManager(ctx, urinfo.Bucket)
if err != nil {
return err
}

dstObj := uinfo.ObjPath
isDirectory, err := n.isGCSDirectory(ctx, bucket, urinfo)
if err != nil {
return err
}

dstObj := urinfo.ObjPath
if isDirectory {
dstObj, err = url.JoinPath(dstObj, filepath.Base(src))
if err != nil {
return err
}
}

wc := bucket.Object(dstObj).NewWriter(ctx)
if _, err = io.Copy(wc, f); err != nil {
return fmt.Errorf("error copying file to GCS: %w", err)
}
if err := wc.Close(); err != nil {
return fmt.Errorf("error closing GCS writer: %w", err)
}
return nil
return bucket.UploadObject(ctx, dstObj, f)
}

func (n *Native) parseGCSURI(uri string) (uriInfo, error) {
Expand All @@ -136,10 +148,10 @@ func (n *Native) parseGCSURI(uri string) (uriInfo, error) {
return gcsobj, nil
}

func (n *Native) filesToDownload(ctx context.Context, bucket *storage.BucketHandle, uinfo uriInfo) (map[string]string, error) {
func (n *Native) filesToDownload(ctx context.Context, bucket bucketHandler, urinfo uriInfo) (map[string]string, error) {
uriToLocalPath := map[string]string{}

exactMatches, err := n.listObjects(ctx, bucket, &storage.Query{MatchGlob: uinfo.ObjPath})
exactMatches, err := bucket.ListObjects(ctx, &storage.Query{MatchGlob: urinfo.ObjPath})
if err != nil {
return nil, err
}
Expand All @@ -148,58 +160,37 @@ func (n *Native) filesToDownload(ctx context.Context, bucket *storage.BucketHand
uriToLocalPath[match] = filepath.Base(match)
}

recursiveMatches, err := n.recursiveListing(ctx, bucket, uinfo)
recursiveMatches, err := n.recursiveListing(ctx, bucket, urinfo)
if err != nil {
return nil, err
}

for _, match := range recursiveMatches {
uriToLocalPath[match] = match
for uri, match := range recursiveMatches {
uriToLocalPath[uri] = match
}

return uriToLocalPath, nil
}

func (n *Native) listObjects(ctx context.Context, bucket *storage.BucketHandle, q *storage.Query) ([]string, error) {
matches := []string{}
it := bucket.Objects(ctx, q)

for {
attrs, err := it.Next()
if err == iterator.Done {
break
}

if err != nil {
return nil, fmt.Errorf("failed to iterate objects: %v", err)
}

if attrs.Name != "" {
matches = append(matches, attrs.Name)
}
}
return matches, nil
}

func (n *Native) recursiveListing(ctx context.Context, bucket *storage.BucketHandle, uinfo uriInfo) (map[string]string, error) {
func (n *Native) recursiveListing(ctx context.Context, bucket bucketHandler, urinfo uriInfo) (map[string]string, error) {
uriToLocalPath := map[string]string{}
recursiveURI := n.uriForRecursiveSearch(uinfo.ObjPath)
recursiveMatches, err := n.listObjects(ctx, bucket, &storage.Query{MatchGlob: recursiveURI})
recursiveURI := n.uriForRecursiveSearch(urinfo.ObjPath)
recursiveMatches, err := bucket.ListObjects(ctx, &storage.Query{MatchGlob: recursiveURI})
if err != nil {
return nil, err
}

prefixRemovalURI := n.uriForPrefixRemoval(uinfo.Full())
prefixRemovalURI := n.uriForPrefixRemoval(urinfo.Full())
prefixRemovalRegex, err := n.wildcardToRegex(prefixRemovalURI)
if err != nil {
return nil, err
}

shouldRecreateFolders := !strings.Contains(uinfo.ObjPath, "**")
shouldRecreateFolders := !strings.Contains(urinfo.ObjPath, "**")
for _, match := range recursiveMatches {
destPath := filepath.Base(match)
if shouldRecreateFolders {
matchWithBucket := uinfo.Bucket + "/" + match
matchWithBucket := urinfo.Bucket + "/" + match
destPath = string(prefixRemovalRegex.ReplaceAll([]byte(matchWithBucket), []byte("")))
}
uriToLocalPath[match] = destPath
Expand Down Expand Up @@ -254,15 +245,69 @@ func (n *Native) wildcardToRegex(wildcard string) (*regexp.Regexp, error) {
return regexp.Compile(regexStr)
}

func (n *Native) downloadFile(ctx context.Context, bucket *storage.BucketHandle, localPath, uri string) error {
dir := filepath.Dir(localPath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create directory: %v", err)
func (n *Native) isGCSDirectory(ctx context.Context, bucket bucketHandler, urinfo uriInfo) (bool, error) {
if urinfo.ObjPath == "" {
return true, nil
}

if strings.HasSuffix(urinfo.ObjPath, "/") {
return true, nil
}

q := &storage.Query{Prefix: urinfo.ObjPath + "/"}
matches, err := bucket.ListObjects(ctx, q)
if err != nil {
return false, err
}

if len(matches) > 0 {
return true, nil
}

return false, nil
}

func getBucketManager(ctx context.Context, bucketName string) (bucketHandler, error) {
sc, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("error creating GCS Client: %w", err)
}

return nativeBucketHandler{
storageClient: sc,
bucket: sc.Bucket(bucketName),
}, nil
}

// nativeBucketHandler implements a handler using the Cloud client libraries.
type nativeBucketHandler struct {
storageClient *storage.Client
bucket *storage.BucketHandle
}

func (nb nativeBucketHandler) ListObjects(ctx context.Context, q *storage.Query) ([]string, error) {
matches := []string{}
it := nb.bucket.Objects(ctx, q)

for {
attrs, err := it.Next()
if err == iterator.Done {
break
}

if err != nil {
return nil, fmt.Errorf("failed to iterate objects: %v", err)
}

if attrs.Name != "" {
matches = append(matches, attrs.Name)
}
}
return matches, nil
}

reader, err := bucket.Object(uri).NewReader(ctx)
func (nb nativeBucketHandler) DownloadObject(ctx context.Context, localPath, uri string) error {
reader, err := nb.bucket.Object(uri).NewReader(ctx)
if err != nil {
return fmt.Errorf("failed to read object: %v", err)
}
Expand All @@ -281,24 +326,17 @@ func (n *Native) downloadFile(ctx context.Context, bucket *storage.BucketHandle,
return nil
}

func (n *Native) isGCSDirectory(ctx context.Context, bucket *storage.BucketHandle, uinfo uriInfo) (bool, error) {
if uinfo.ObjPath == "" {
return true, nil
}

if strings.HasSuffix(uinfo.ObjPath, "/") {
return true, nil
}

q := &storage.Query{Prefix: uinfo.ObjPath + "/"}
matches, err := n.listObjects(ctx, bucket, q)
if err != nil {
return false, err
func (nb nativeBucketHandler) UploadObject(ctx context.Context, objName string, content *os.File) error {
wc := nb.bucket.Object(objName).NewWriter(ctx)
if _, err := io.Copy(wc, content); err != nil {
return fmt.Errorf("error copying file to GCS: %w", err)
}

if len(matches) > 0 {
return true, nil
if err := wc.Close(); err != nil {
return fmt.Errorf("error closing GCS writer: %w", err)
}
return nil
}

return false, nil
func (nb nativeBucketHandler) Close() {
nb.storageClient.Close()
}
Loading

0 comments on commit 35d83ff

Please sign in to comment.