Skip to content

Commit

Permalink
uplink: enable new concurrent segment upload codepath by default
Browse files Browse the repository at this point in the history
This change enables new concurrent segment upload codepath by default.
It's still possible to disable it with context created with
testuplink.DisableConcurrentSegmentUploads.

As an addition this change fixes small Jenkins issue.

Change-Id: I26229bb0f071edc6433bfdcde404e00be1ea35ab
  • Loading branch information
mniewrzal committed Sep 13, 2023
1 parent 138ef76 commit a4851fc
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 33 deletions.
5 changes: 1 addition & 4 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,12 @@ pipeline {
dir('testsuite'){
sh 'cp go.mod go-temp.mod'
sh 'go vet -modfile go-temp.mod -mod=mod storj.io/storj/...'
sh 'go test -modfile go-temp.mod -mod=mod -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... > ../.build/testsuite-storj.json'
sh 'go test -modfile go-temp.mod -mod=mod -tags noembed -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... 2>&1 | tee ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml'
}
}

post {
always {
dir('testsuite'){
sh 'cat ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml'
}
sh script: 'cat .build/testsuite-storj.json | tparse -all -top -slow 100', returnStatus: true
archiveArtifacts artifacts: '.build/testsuite-storj.json'
junit '.build/testsuite-storj.xml'
Expand Down
19 changes: 16 additions & 3 deletions private/testuplink/uplink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type listLimitKey struct{}

type concurrentSegmentUploadsConfigKey struct{}

type disableConcurrentSegmentUploadsKey struct{}

type (
logWriterKey struct{}
logWriterContextKey struct{}
Expand Down Expand Up @@ -111,14 +113,25 @@ func WithConcurrentSegmentUploadsConfig(ctx context.Context, config ConcurrentSe
return context.WithValue(ctx, concurrentSegmentUploadsConfigKey{}, config)
}

// DisableConcurrentSegmentUploads creates a context that disables the new
// concurrent segment upload codepath.
func DisableConcurrentSegmentUploads(ctx context.Context) context.Context {
return context.WithValue(ctx, disableConcurrentSegmentUploadsKey{}, struct{}{})
}

// GetConcurrentSegmentUploadsConfig returns the scheduler options to
// use with the new concurrent segment upload codepath, or nil if no scheduler
// options have been set.
// use with the new concurrent segment upload codepath, if no scheduler
// options have been set it will return default configuration. Concurrent
// segment upload code path can be disabled with DisableConcurrentSegmentUploads.
func GetConcurrentSegmentUploadsConfig(ctx context.Context) *ConcurrentSegmentUploadsConfig {
if value := ctx.Value(disableConcurrentSegmentUploadsKey{}); value != nil {
return nil
}
if config, ok := ctx.Value(concurrentSegmentUploadsConfigKey{}).(ConcurrentSegmentUploadsConfig); ok {
return &config
}
return nil
config := DefaultConcurrentSegmentUploadsConfig()
return &config
}

// WithLogWriter creates context with information about upload log file.
Expand Down
17 changes: 5 additions & 12 deletions testsuite/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,16 +618,11 @@ func TestUploadNotAllowedPath(t *testing.T) {

testData := bytes.NewBuffer(testrand.Bytes(1 * memory.KiB))

upload, err := project.UploadObject(ctx, "testbucket", "first-level-object", nil)
require.NoError(t, err)

_, err = io.Copy(upload, testData)
_, err = project.UploadObject(ctx, "testbucket", "first-level-object", nil)
require.Error(t, err)
require.ErrorIs(t, err, uplink.ErrPermissionDenied)

err = upload.Abort()
require.NoError(t, err)

upload, err = project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil)
upload, err := project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil)
require.NoError(t, err)

_, err = io.Copy(upload, testData)
Expand Down Expand Up @@ -876,11 +871,9 @@ func TestImmutableUpload(t *testing.T) {
}

{ // we shouldn't be able upload to a different location
upload, err := project.UploadObject(ctx, "testbucket", "object2", nil)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
_, err := project.UploadObject(ctx, "testbucket", "object2", nil)
require.Error(t, err)
require.Error(t, upload.Commit())
require.ErrorIs(t, err, uplink.ErrPermissionDenied)
}

// we shouldn't be able to delete
Expand Down
5 changes: 1 addition & 4 deletions testsuite/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/errs2"
"storj.io/common/fpath"
"storj.io/common/memory"
"storj.io/common/testcontext"
Expand Down Expand Up @@ -386,9 +385,7 @@ func TestContextCancelUpload(t *testing.T) {
assertObjectEmptyCreated(t, upload.Info(), "test.dat")

uploadcancel()
_, err = upload.Write(randData)
require.Error(t, err)
require.True(t, errs2.IsCanceled(err))
requireWriteEventuallyReturns(t, upload, randData, context.Canceled)

err = upload.Abort()
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions testsuite/uplink/metainfo/uplink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
"storj.io/uplink/private/testuplink"
)

func TestMultisegmentUploadWithLastInline(t *testing.T) {
// this is special case were uploaded object has 3 segments (2 remote + 1 inline)
func TestMultisegmentUploadWithoutInlineSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Expand All @@ -36,7 +35,9 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedData, downloaded)

// verify that object has 3 segments, 2 remote + 1 inline
// in the past object with size equal to multiplication of max segment size was uploaded
// as remote segments + one additional inline segment, after upload code path refactor we
// are uploading now only 2 remote segments without last inline segment
objects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "testbucket")
require.NoError(t, err)
require.Len(t, objects, 1)
Expand All @@ -47,9 +48,8 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) {
ObjectKey: objects[0].ObjectKey,
})
require.NoError(t, err)
require.Equal(t, 3, len(segments))
// TODO we should check 2 segments to be remote and last one to be inline
// but main satellite implementation doens't give such info at the moment
require.Len(t, segments, 2)
require.EqualValues(t, 20*memory.KiB, segments[0].PlainSize)
})
}

Expand Down
151 changes: 147 additions & 4 deletions testsuite/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/uplink"
"storj.io/uplink/private/testuplink"
)
Expand Down Expand Up @@ -360,10 +362,7 @@ func TestUploadEventuallyFailsWithNoNodes(t *testing.T) {
require.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
}

project, err := planet.Uplinks[0].OpenProject(
testuplink.WithConcurrentSegmentUploadsDefaultConfig(ctx),
planet.Satellites[0],
)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)

Expand Down Expand Up @@ -449,3 +448,147 @@ func TestConcurrentUploadToSamePath(t *testing.T) {
require.Equal(t, expectedData, downloaded)
})
}

func TestUploadLimits(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.ProjectLimit.CacheCapacity = 0
},
},
}, func(t *testing.T, tpCtx *testcontext.Context, planet *testplanet.Planet) {
data := testrand.Bytes(6 * memory.KiB)

ctx := testuplink.WithMaxSegmentSize(tpCtx, 5*memory.KiB)

t.Run("segment limit", func(t *testing.T) {
upl := planet.Uplinks[0]
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
err := accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0)
require.NoError(t, err)

project, err := upl.OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer tpCtx.Check(project.Close)

_, err = project.CreateBucket(ctx, "testbucket")
require.NoError(t, err)

// should fail on Write beause we uploaded more than segment
// and request to satellite were made. The Write call may not fail
// immmediately, since writes are buffered and the segment uploads
// are handled concurrently.
upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)
requireWriteEventuallyReturns(t, upload, data, uplink.ErrSegmentsLimitExceeded)
require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded)

// should fail on Commit as Write input is too small to create single segment
upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)
n, err := upload.Write(testrand.Bytes(3 * memory.KiB))
require.NoError(t, err)
require.NotZero(t, n)
require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded)

// should fail on direct call to BeginObject
_, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
require.ErrorIs(t, err, uplink.ErrSegmentsLimitExceeded)

// update limit to be able to call BeginUpload without error
err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 1)
require.NoError(t, err)

uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)

err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0)
require.NoError(t, err)

// should fail on Write beause we uploaded more than segment
// and request to satellite were made. The Write call may not fail
// immmediately, since writes are buffered and the segment uploads
// are handled concurrently.
partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
require.NoError(t, err)
requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrSegmentsLimitExceeded)
require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded)

// should fail on Commit as Write input is too small to create single segment
partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
require.NoError(t, err)
_, err = partUpload.Write(testrand.Bytes(3 * memory.KiB))
require.NoError(t, err)
require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded)
})
t.Run("storage limit", func(t *testing.T) {
upl := planet.Uplinks[1]
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
err := accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0)
require.NoError(t, err)

project, err := upl.OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer tpCtx.Check(project.Close)

_, err = project.CreateBucket(ctx, "testbucket")
require.NoError(t, err)

// should fail on Write beause we uploaded more than segment
// and request to satellite were made. The Write call may not fail
// immmediately, since writes are buffered and the segment uploads
// are handled concurrently.
upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)
requireWriteEventuallyReturns(t, upload, data, uplink.ErrStorageLimitExceeded)
require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded)

// should fail on Commit as Write input is too small to create single segment
upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(3 * memory.KiB))
require.NoError(t, err)
require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded)

// should fail on direct call to BeginObject
_, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
require.ErrorIs(t, err, uplink.ErrStorageLimitExceeded)

// update limit to be able to call BeginUpload without error
err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 1)
require.NoError(t, err)

uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
require.NoError(t, err)

err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0)
require.NoError(t, err)

// should fail on Write beause we uploaded more than segment
// and request to satellite were made. The Write call may not fail
// immmediately, since writes are buffered and the segment uploads
// are handled concurrently.
partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
require.NoError(t, err)
requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrStorageLimitExceeded)
require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded)

// should fail on Commit as Write input is too small to create single segment
partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
require.NoError(t, err)
_, err = partUpload.Write(testrand.Bytes(3 * memory.KiB))
require.NoError(t, err)
require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded)
})
})
}

func requireWriteEventuallyReturns(tb testing.TB, w io.Writer, data []byte, expectErr error) {
require.Eventually(tb, func() bool {
_, err := w.Write(data)
// only write the data on the first call to write.
data = data[0:]
return errors.Is(err, expectErr)
}, time.Second*5, time.Millisecond*10)
}

0 comments on commit a4851fc

Please sign in to comment.