From a4851fc927bb72b0bb81f06b53ca44eda4238e14 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 11 Sep 2023 21:00:55 +0200 Subject: [PATCH] uplink: enable new concurrent segment upload codepath by default This change enables new concurrent segment upload codepath by default. It's still possible to disable it with context created with testuplink.DisableConcurrentSegmentUploads. As an addition this change fixes small Jenkins issue. Change-Id: I26229bb0f071edc6433bfdcde404e00be1ea35ab --- Jenkinsfile | 5 +- private/testuplink/uplink.go | 19 ++- testsuite/access_test.go | 17 +-- testsuite/object_test.go | 5 +- testsuite/uplink/metainfo/uplink_test.go | 12 +- testsuite/upload_test.go | 151 ++++++++++++++++++++++- 6 files changed, 176 insertions(+), 33 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 5b0d3a27..8f172744 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -154,15 +154,12 @@ pipeline { dir('testsuite'){ sh 'cp go.mod go-temp.mod' sh 'go vet -modfile go-temp.mod -mod=mod storj.io/storj/...' - sh 'go test -modfile go-temp.mod -mod=mod -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... > ../.build/testsuite-storj.json' + sh 'go test -modfile go-temp.mod -mod=mod -tags noembed -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... 2>&1 | tee ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml' } } post { always { - dir('testsuite'){ - sh 'cat ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml' - } sh script: 'cat .build/testsuite-storj.json | tparse -all -top -slow 100', returnStatus: true archiveArtifacts artifacts: '.build/testsuite-storj.json' junit '.build/testsuite-storj.xml' diff --git a/private/testuplink/uplink.go b/private/testuplink/uplink.go index 844a5732..76d75870 100644 --- a/private/testuplink/uplink.go +++ b/private/testuplink/uplink.go @@ -22,6 +22,8 @@ type listLimitKey struct{} type concurrentSegmentUploadsConfigKey struct{} +type disableConcurrentSegmentUploadsKey struct{} + type ( logWriterKey struct{} logWriterContextKey struct{} @@ -111,14 +113,25 @@ func WithConcurrentSegmentUploadsConfig(ctx context.Context, config ConcurrentSe return context.WithValue(ctx, concurrentSegmentUploadsConfigKey{}, config) } +// DisableConcurrentSegmentUploads creates a context that disables the new +// concurrent segment upload codepath. +func DisableConcurrentSegmentUploads(ctx context.Context) context.Context { + return context.WithValue(ctx, disableConcurrentSegmentUploadsKey{}, struct{}{}) +} + // GetConcurrentSegmentUploadsConfig returns the scheduler options to -// use with the new concurrent segment upload codepath, or nil if no scheduler -// options have been set. +// use with the new concurrent segment upload codepath, if no scheduler +// options have been set it will return default configuration. Concurrent +// segment upload code path can be disabled with DisableConcurrentSegmentUploads. func GetConcurrentSegmentUploadsConfig(ctx context.Context) *ConcurrentSegmentUploadsConfig { + if value := ctx.Value(disableConcurrentSegmentUploadsKey{}); value != nil { + return nil + } if config, ok := ctx.Value(concurrentSegmentUploadsConfigKey{}).(ConcurrentSegmentUploadsConfig); ok { return &config } - return nil + config := DefaultConcurrentSegmentUploadsConfig() + return &config } // WithLogWriter creates context with information about upload log file. diff --git a/testsuite/access_test.go b/testsuite/access_test.go index 491efb0c..9fe03616 100644 --- a/testsuite/access_test.go +++ b/testsuite/access_test.go @@ -618,16 +618,11 @@ func TestUploadNotAllowedPath(t *testing.T) { testData := bytes.NewBuffer(testrand.Bytes(1 * memory.KiB)) - upload, err := project.UploadObject(ctx, "testbucket", "first-level-object", nil) - require.NoError(t, err) - - _, err = io.Copy(upload, testData) + _, err = project.UploadObject(ctx, "testbucket", "first-level-object", nil) require.Error(t, err) + require.ErrorIs(t, err, uplink.ErrPermissionDenied) - err = upload.Abort() - require.NoError(t, err) - - upload, err = project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil) + upload, err := project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil) require.NoError(t, err) _, err = io.Copy(upload, testData) @@ -876,11 +871,9 @@ func TestImmutableUpload(t *testing.T) { } { // we shouldn't be able upload to a different location - upload, err := project.UploadObject(ctx, "testbucket", "object2", nil) - require.NoError(t, err) - _, err = upload.Write(testrand.Bytes(5 * memory.KiB)) + _, err := project.UploadObject(ctx, "testbucket", "object2", nil) require.Error(t, err) - require.Error(t, upload.Commit()) + require.ErrorIs(t, err, uplink.ErrPermissionDenied) } // we shouldn't be able to delete diff --git a/testsuite/object_test.go b/testsuite/object_test.go index 0df4fd97..525fe9b1 100644 --- a/testsuite/object_test.go +++ b/testsuite/object_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "storj.io/common/errs2" "storj.io/common/fpath" "storj.io/common/memory" "storj.io/common/testcontext" @@ -386,9 +385,7 @@ func TestContextCancelUpload(t *testing.T) { assertObjectEmptyCreated(t, upload.Info(), "test.dat") uploadcancel() - _, err = upload.Write(randData) - require.Error(t, err) - require.True(t, errs2.IsCanceled(err)) + requireWriteEventuallyReturns(t, upload, randData, context.Canceled) err = upload.Abort() require.NoError(t, err) diff --git a/testsuite/uplink/metainfo/uplink_test.go b/testsuite/uplink/metainfo/uplink_test.go index c4f42b3c..6d624502 100644 --- a/testsuite/uplink/metainfo/uplink_test.go +++ b/testsuite/uplink/metainfo/uplink_test.go @@ -16,8 +16,7 @@ import ( "storj.io/uplink/private/testuplink" ) -func TestMultisegmentUploadWithLastInline(t *testing.T) { - // this is special case were uploaded object has 3 segments (2 remote + 1 inline) +func TestMultisegmentUploadWithoutInlineSegment(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ @@ -36,7 +35,9 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedData, downloaded) - // verify that object has 3 segments, 2 remote + 1 inline + // in the past object with size equal to multiplication of max segment size was uploaded + // as remote segments + one additional inline segment, after upload code path refactor we + // are uploading now only 2 remote segments without last inline segment objects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "testbucket") require.NoError(t, err) require.Len(t, objects, 1) @@ -47,9 +48,8 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) { ObjectKey: objects[0].ObjectKey, }) require.NoError(t, err) - require.Equal(t, 3, len(segments)) - // TODO we should check 2 segments to be remote and last one to be inline - // but main satellite implementation doens't give such info at the moment + require.Len(t, segments, 2) + require.EqualValues(t, 20*memory.KiB, segments[0].PlainSize) }) } diff --git a/testsuite/upload_test.go b/testsuite/upload_test.go index c44d9b44..1d8e2669 100644 --- a/testsuite/upload_test.go +++ b/testsuite/upload_test.go @@ -13,11 +13,13 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/uplink" "storj.io/uplink/private/testuplink" ) @@ -360,10 +362,7 @@ func TestUploadEventuallyFailsWithNoNodes(t *testing.T) { require.NoError(t, planet.StopPeer(planet.StorageNodes[i])) } - project, err := planet.Uplinks[0].OpenProject( - testuplink.WithConcurrentSegmentUploadsDefaultConfig(ctx), - planet.Satellites[0], - ) + project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) require.NoError(t, err) defer ctx.Check(project.Close) @@ -449,3 +448,147 @@ func TestConcurrentUploadToSamePath(t *testing.T) { require.Equal(t, expectedData, downloaded) }) } + +func TestUploadLimits(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.ProjectLimit.CacheCapacity = 0 + }, + }, + }, func(t *testing.T, tpCtx *testcontext.Context, planet *testplanet.Planet) { + data := testrand.Bytes(6 * memory.KiB) + + ctx := testuplink.WithMaxSegmentSize(tpCtx, 5*memory.KiB) + + t.Run("segment limit", func(t *testing.T) { + upl := planet.Uplinks[0] + accountingDB := planet.Satellites[0].DB.ProjectAccounting() + err := accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0) + require.NoError(t, err) + + project, err := upl.OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer tpCtx.Check(project.Close) + + _, err = project.CreateBucket(ctx, "testbucket") + require.NoError(t, err) + + // should fail on Write beause we uploaded more than segment + // and request to satellite were made. The Write call may not fail + // immmediately, since writes are buffered and the segment uploads + // are handled concurrently. + upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + requireWriteEventuallyReturns(t, upload, data, uplink.ErrSegmentsLimitExceeded) + require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded) + + // should fail on Commit as Write input is too small to create single segment + upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + n, err := upload.Write(testrand.Bytes(3 * memory.KiB)) + require.NoError(t, err) + require.NotZero(t, n) + require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded) + + // should fail on direct call to BeginObject + _, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil) + require.ErrorIs(t, err, uplink.ErrSegmentsLimitExceeded) + + // update limit to be able to call BeginUpload without error + err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 1) + require.NoError(t, err) + + uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + + err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0) + require.NoError(t, err) + + // should fail on Write beause we uploaded more than segment + // and request to satellite were made. The Write call may not fail + // immmediately, since writes are buffered and the segment uploads + // are handled concurrently. + partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0) + require.NoError(t, err) + requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrSegmentsLimitExceeded) + require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded) + + // should fail on Commit as Write input is too small to create single segment + partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0) + require.NoError(t, err) + _, err = partUpload.Write(testrand.Bytes(3 * memory.KiB)) + require.NoError(t, err) + require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded) + }) + t.Run("storage limit", func(t *testing.T) { + upl := planet.Uplinks[1] + accountingDB := planet.Satellites[0].DB.ProjectAccounting() + err := accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0) + require.NoError(t, err) + + project, err := upl.OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer tpCtx.Check(project.Close) + + _, err = project.CreateBucket(ctx, "testbucket") + require.NoError(t, err) + + // should fail on Write beause we uploaded more than segment + // and request to satellite were made. The Write call may not fail + // immmediately, since writes are buffered and the segment uploads + // are handled concurrently. + upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + requireWriteEventuallyReturns(t, upload, data, uplink.ErrStorageLimitExceeded) + require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded) + + // should fail on Commit as Write input is too small to create single segment + upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + _, err = upload.Write(testrand.Bytes(3 * memory.KiB)) + require.NoError(t, err) + require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded) + + // should fail on direct call to BeginObject + _, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil) + require.ErrorIs(t, err, uplink.ErrStorageLimitExceeded) + + // update limit to be able to call BeginUpload without error + err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 1) + require.NoError(t, err) + + uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil) + require.NoError(t, err) + + err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0) + require.NoError(t, err) + + // should fail on Write beause we uploaded more than segment + // and request to satellite were made. The Write call may not fail + // immmediately, since writes are buffered and the segment uploads + // are handled concurrently. + partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0) + require.NoError(t, err) + requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrStorageLimitExceeded) + require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded) + + // should fail on Commit as Write input is too small to create single segment + partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0) + require.NoError(t, err) + _, err = partUpload.Write(testrand.Bytes(3 * memory.KiB)) + require.NoError(t, err) + require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded) + }) + }) +} + +func requireWriteEventuallyReturns(tb testing.TB, w io.Writer, data []byte, expectErr error) { + require.Eventually(tb, func() bool { + _, err := w.Write(data) + // only write the data on the first call to write. + data = data[0:] + return errors.Is(err, expectErr) + }, time.Second*5, time.Millisecond*10) +}