Skip to content

Commit

Permalink
Use time of create MPU on backend storage as time of MPU
Browse files Browse the repository at this point in the history
This time is either reported by the block adapter during creation, or it can
be read by stat'ting the underlying object after complete-MPU.  Use the
latter method on S3.
  • Loading branch information
arielshaqed committed Oct 27, 2024
1 parent cdd548d commit 76e95f2
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 7 deletions.
21 changes: 21 additions & 0 deletions esti/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -23,6 +24,11 @@ const (
)

func TestMultipartUpload(t *testing.T) {
// timeSlippage is a bound on the time difference between the local server and the S3
// server. It is used to verify that the "Last-Modified" time is actually close to the
// CreateMultipartUpload time.
const timeSlippage = time.Millisecond * 150

ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
file := "multipart_file"
Expand All @@ -36,6 +42,8 @@ func TestMultipartUpload(t *testing.T) {
require.NoError(t, err, "failed to create multipart upload")
logger.Info("Created multipart upload request")

uploadTime := time.Now()

parts := make([][]byte, multipartNumberOfParts)
var partsConcat []byte
for i := 0; i < multipartNumberOfParts; i++ {
Expand All @@ -44,6 +52,11 @@ func TestMultipartUpload(t *testing.T) {
}

completedParts := uploadMultipartParts(t, ctx, logger, resp, parts, 0)

// Object should have Last-Modified time at around time of MPU creation. Server times
// after this Sleep will be more than timeSlippage away from uploadTime.
time.Sleep(2 * timeSlippage)

completeResponse, err := uploadMultipartComplete(ctx, svc, resp, completedParts)
require.NoError(t, err, "failed to complete multipart upload")

Expand All @@ -55,6 +68,14 @@ func TestMultipartUpload(t *testing.T) {
if !bytes.Equal(partsConcat, getResp.Body) {
t.Fatalf("uploaded object did not match")
}

statResp, err := client.StatObjectWithResponse(ctx, repo, mainBranch, &apigen.StatObjectParams{Path: file})
require.NoError(t, err, "failed to get object")
require.Equal(t, http.StatusOK, getResp.StatusCode())
lastModified := time.Unix(statResp.JSON200.Mtime, 0)
require.Less(lastModified.Add(-uploadTime).Abs(), timeSlippage,

Check failure on line 76 in esti/multipart_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests

invalid operation: operator - not defined on uploadTime (variable of type "time".Time)

Check failure on line 76 in esti/multipart_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests

lastModified.Add(-uploadTime).Abs undefined (type "time".Time has no field or method Abs)

Check failure on line 76 in esti/multipart_test.go

View workflow job for this annotation

GitHub Actions / Test unified gc

invalid operation: operator - not defined on uploadTime (variable of type "time".Time)

Check failure on line 76 in esti/multipart_test.go

View workflow job for this annotation

GitHub Actions / Test unified gc

lastModified.Add(-uploadTime).Abs undefined (type "time".Time has no field or method Abs)
"(remote) last modified time %s too far away from (local) upload time %s",
lastModified, uploadTime)
}

func TestMultipartUploadAbort(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht

writeTime := time.Now()

Check failure on line 385 in pkg/api/controller.go

View workflow job for this annotation

GitHub Actions / Run Linters and Checkers

ineffectual assignment to writeTime (ineffassign)
checksum := httputil.StripQuotesAndSpaces(mpuResp.ETag)
if mpuResp.MTime == nil {
// This can be _really_ wrong when the storage layer assigns the time of MPU
// creation. For instance, the S3 block adapter takes sure to return an MTime
// from headObject to ensure that we do have a time here.
writeTime = time.Now()
} else {
writeTime = *mpuResp.MTime
}
entryBuilder := catalog.NewDBEntryBuilder().
CommonLevel(false).
Path(params.Path).
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ type CreateMultiPartUploadResponse struct {
ServerSideHeader http.Header
}

// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific) currently it targets s3.
// The ETag is a hex string value of the content checksum
// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific).
type CompleteMultiPartUploadResponse struct {
ETag string
// ETag is a hex string value of the content checksum
ETag string
// MTime, if non-nil, is the creation time of the resulting object. Typically the
// object store returns it on a Last-Modified header from some operations.
MTime *time.Time
ContentLength int64
ServerSideHeader http.Header
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/azure/multipart_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func completeMultipart(ctx context.Context, parts []block.MultipartPart, contain
etag := string(*res.ETag)
return &block.CompleteMultiPartUploadResponse{
ETag: etag,
MTime: res.LastModified,
ContentLength: size,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
lg.Debug("completed multipart upload")
return &block.CompleteMultiPartUploadResponse{
ETag: targetAttrs.Etag,
MTime: &targetAttrs.Created,
ContentLength: targetAttrs.Size,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
etag := strings.Trim(aws.ToString(resp.ETag), `"`)
return &block.CompleteMultiPartUploadResponse{
ETag: etag,
MTime: headResp.LastModified,
ContentLength: aws.ToInt64(headResp.ContentLength),
ServerSideHeader: extractSSHeaderCompleteMultipartUpload(resp),
}, nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/gateway/operations/operation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ func shouldReplaceMetadata(req *http.Request) bool {
return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE"
}

func (o *PathOperation) finishUpload(req *http.Request, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
var writeTime time.Time
if mTime == nil {
writeTime = time.Now()
} else {
writeTime = *mTime
}
// write metadata
writeTime := time.Now()
entry := catalog.NewDBEntryBuilder().
Path(o.Path).
RelativeAddress(relative).
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
return
}
checksum := strings.Split(resp.ETag, "-")[0]
err = o.finishUpload(req, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
// write metadata
metadata := amzMetaAsMetadata(req)
contentType := req.Header.Get("Content-Type")
err = o.finishUpload(req, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
// BUG(ariels): Read MTime from upload!
err = o.finishUpload(req, nil, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down

0 comments on commit 76e95f2

Please sign in to comment.