From 76e95f2ce8c7cb219217798e3a85207d2fd54e25 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Sun, 27 Oct 2024 17:55:04 +0200 Subject: [PATCH] Use time of create MPU on backend storage as time of MPU This time is either reported by the block adapter during creation, or it can be read by stat'ting the underlying object after complete-MPU. Use the latter method on S3. --- esti/multipart_test.go | 21 +++++++++++++++++++++ pkg/api/controller.go | 8 ++++++++ pkg/block/adapter.go | 9 ++++++--- pkg/block/azure/multipart_block_writer.go | 1 + pkg/block/gs/adapter.go | 1 + pkg/block/s3/adapter.go | 1 + pkg/gateway/operations/operation_utils.go | 9 +++++++-- pkg/gateway/operations/postobject.go | 2 +- pkg/gateway/operations/putobject.go | 3 ++- 9 files changed, 48 insertions(+), 7 deletions(-) diff --git a/esti/multipart_test.go b/esti/multipart_test.go index d4cfbde85b8..907677342a0 100644 --- a/esti/multipart_test.go +++ b/esti/multipart_test.go @@ -6,6 +6,7 @@ import ( "net/http" "sync" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -23,6 +24,11 @@ const ( ) func TestMultipartUpload(t *testing.T) { + // timeSlippage is a bound on the time difference between the local server and the S3 + // server. It is used to verify that the "Last-Modified" time is actually close to the + // CreateMultipartUpload time. + const timeSlippage = time.Millisecond * 150 + ctx, logger, repo := setupTest(t) defer tearDownTest(repo) file := "multipart_file" @@ -36,6 +42,8 @@ func TestMultipartUpload(t *testing.T) { require.NoError(t, err, "failed to create multipart upload") logger.Info("Created multipart upload request") + uploadTime := time.Now() + parts := make([][]byte, multipartNumberOfParts) var partsConcat []byte for i := 0; i < multipartNumberOfParts; i++ { @@ -44,6 +52,11 @@ func TestMultipartUpload(t *testing.T) { } completedParts := uploadMultipartParts(t, ctx, logger, resp, parts, 0) + + // Object should have Last-Modified time at around time of MPU creation. Server times + // after this Sleep will be more than timeSlippage away from uploadTime. + time.Sleep(2 * timeSlippage) + completeResponse, err := uploadMultipartComplete(ctx, svc, resp, completedParts) require.NoError(t, err, "failed to complete multipart upload") @@ -55,6 +68,14 @@ func TestMultipartUpload(t *testing.T) { if !bytes.Equal(partsConcat, getResp.Body) { t.Fatalf("uploaded object did not match") } + + statResp, err := client.StatObjectWithResponse(ctx, repo, mainBranch, &apigen.StatObjectParams{Path: file}) + require.NoError(t, err, "failed to get object") + require.Equal(t, http.StatusOK, getResp.StatusCode()) + lastModified := time.Unix(statResp.JSON200.Mtime, 0) + require.Less(lastModified.Add(-uploadTime).Abs(), timeSlippage, + "(remote) last modified time %s too far away from (local) upload time %s", + lastModified, uploadTime) } func TestMultipartUploadAbort(t *testing.T) { diff --git a/pkg/api/controller.go b/pkg/api/controller.go index c39b304e07a..5f13dd68a7c 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -384,6 +384,14 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht writeTime := time.Now() checksum := httputil.StripQuotesAndSpaces(mpuResp.ETag) + if mpuResp.MTime == nil { + // This can be _really_ wrong when the storage layer assigns the time of MPU + // creation. For instance, the S3 block adapter takes sure to return an MTime + // from headObject to ensure that we do have a time here. + writeTime = time.Now() + } else { + writeTime = *mpuResp.MTime + } entryBuilder := catalog.NewDBEntryBuilder(). CommonLevel(false). Path(params.Path). diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 684c24d9773..4945c8ac517 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -96,10 +96,13 @@ type CreateMultiPartUploadResponse struct { ServerSideHeader http.Header } -// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific) currently it targets s3. -// The ETag is a hex string value of the content checksum +// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific). type CompleteMultiPartUploadResponse struct { - ETag string + // ETag is a hex string value of the content checksum + ETag string + // MTime, if non-nil, is the creation time of the resulting object. Typically the + // object store returns it on a Last-Modified header from some operations. + MTime *time.Time ContentLength int64 ServerSideHeader http.Header } diff --git a/pkg/block/azure/multipart_block_writer.go b/pkg/block/azure/multipart_block_writer.go index e82b1f0dec4..72248c0a4a6 100644 --- a/pkg/block/azure/multipart_block_writer.go +++ b/pkg/block/azure/multipart_block_writer.go @@ -106,6 +106,7 @@ func completeMultipart(ctx context.Context, parts []block.MultipartPart, contain etag := string(*res.ETag) return &block.CompleteMultiPartUploadResponse{ ETag: etag, + MTime: res.LastModified, ContentLength: size, }, nil } diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 43328cddeb4..704e8922658 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -524,6 +524,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP lg.Debug("completed multipart upload") return &block.CompleteMultiPartUploadResponse{ ETag: targetAttrs.Etag, + MTime: &targetAttrs.Created, ContentLength: targetAttrs.Size, }, nil } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 7704d6a33ff..d9c12bbd24f 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -779,6 +779,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP etag := strings.Trim(aws.ToString(resp.ETag), `"`) return &block.CompleteMultiPartUploadResponse{ ETag: etag, + MTime: headResp.LastModified, ContentLength: aws.ToInt64(headResp.ContentLength), ServerSideHeader: extractSSHeaderCompleteMultipartUpload(resp), }, nil diff --git a/pkg/gateway/operations/operation_utils.go b/pkg/gateway/operations/operation_utils.go index 94fccb116ed..c9ae38c3fc9 100644 --- a/pkg/gateway/operations/operation_utils.go +++ b/pkg/gateway/operations/operation_utils.go @@ -40,9 +40,14 @@ func shouldReplaceMetadata(req *http.Request) bool { return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE" } -func (o *PathOperation) finishUpload(req *http.Request, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error { +func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error { + var writeTime time.Time + if mTime == nil { + writeTime = time.Now() + } else { + writeTime = *mTime + } // write metadata - writeTime := time.Now() entry := catalog.NewDBEntryBuilder(). Path(o.Path). RelativeAddress(relative). diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index 3cdda78258a..984512ca551 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -124,7 +124,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite return } checksum := strings.Split(resp.ETag, "-")[0] - err = o.finishUpload(req, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType) + err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType) if errors.Is(err, graveler.ErrWriteToProtectedBranch) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch)) return diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 4fe5be16002..2ef5fffa5d9 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -309,7 +309,8 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { // write metadata metadata := amzMetaAsMetadata(req) contentType := req.Header.Get("Content-Type") - err = o.finishUpload(req, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) + // BUG(ariels): Read MTime from upload! + err = o.finishUpload(req, nil, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) if errors.Is(err, graveler.ErrWriteToProtectedBranch) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch)) return