diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 9972cad616e..ad8267d50ac 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -4701,16 +4701,8 @@ func (c *Controller) UpdateObjectUserMetadata(w http.ResponseWriter, r *http.Req ctx := r.Context() c.LogAction(ctx, "update_object_user_metadata", r, repository, branch, "") - // read all the _other_ metadata. Does not require checking read - // permissions, as the caller will never see this. - entry, err := c.Catalog.GetEntry(ctx, repository, branch, params.Path, catalog.GetEntryParams{}) - if c.handleAPIError(ctx, w, r, err) { - return - } - - entry.Metadata = catalog.Metadata(body.Set.AdditionalProperties) - - err = c.Catalog.CreateEntry(ctx, repository, branch, *entry) + newUserMetadata := body.Set.AdditionalProperties + err := c.Catalog.UpdateEntryUserMetadata(ctx, repository, branch, params.Path, newUserMetadata) if c.handleAPIError(ctx, w, r, err) { return } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index bd9aefaaf9a..78135dc1c4b 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -2299,47 +2299,73 @@ func TestController_UpdateObjectUserMetadataHander(t *testing.T) { t.Fatal(err) } - t.Run("update metadata", func(t *testing.T) { - const objPath = "foo/bar" - entry := catalog.DBEntry{ - Path: objPath, - PhysicalAddress: "this_is_bars_address", - CreationDate: time.Now(), - Size: 666, - Checksum: "this_is_a_checksum", - } - testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry)) + bools := []bool{false, true} - userMetadataMap := map[string]string{ - "foo": "bar", - "baz": "quux", + for _, doCommit := range bools { + commitLabel := "no commit" + if doCommit { + commitLabel = "commit" } + for _, doSetMetadata := range bools { + metadataLabel := "no metadata" + if doSetMetadata { + metadataLabel = "initial metadata" + } - body := apigen.UpdateObjectUserMetadataJSONRequestBody{ - Set: apigen.ObjectUserMetadata{ - AdditionalProperties: userMetadataMap, - }, - } + label := fmt.Sprintf("%s, %s", commitLabel, metadataLabel) + t.Run(label, func(t *testing.T) { + const objPath = "foo/bar" + entry := catalog.DBEntry{ + Path: objPath, + PhysicalAddress: "this_is_bars_address", + CreationDate: time.Now(), + Size: 666, + Checksum: "this_is_a_checksum", + } + if doSetMetadata { + entry.Metadata = catalog.Metadata{ + "old": "metadata", + } + } + testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry)) - resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main", - &apigen.UpdateObjectUserMetadataParams{Path: objPath}, - body, - ) - verifyResponseOK(t, resp, err) + if doCommit { + _, err := deps.catalog.Commit(ctx, repo, "main", "First commit!", t.Name(), nil, nil, nil, false) + testutil.MustDo(t, "Commit", err) + } - // Verify that it was set - statResp, err := clt.StatObjectWithResponse(ctx, repo, "main", - &apigen.StatObjectParams{ - Path: objPath, - UserMetadata: swag.Bool(true), - }, - ) - verifyResponseOK(t, statResp, err) - objectStats := statResp.JSON200 - if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil { - t.Errorf("did not get expected metadata, diffs %s", diffs) + userMetadataMap := map[string]string{ + "foo": "bar", + "baz": "quux", + } + + body := apigen.UpdateObjectUserMetadataJSONRequestBody{ + Set: apigen.ObjectUserMetadata{ + AdditionalProperties: userMetadataMap, + }, + } + + resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main", + &apigen.UpdateObjectUserMetadataParams{Path: objPath}, + body, + ) + verifyResponseOK(t, resp, err) + + // Verify that it was set + statResp, err := clt.StatObjectWithResponse(ctx, repo, "main", + &apigen.StatObjectParams{ + Path: objPath, + UserMetadata: swag.Bool(true), + }, + ) + verifyResponseOK(t, statResp, err) + objectStats := statResp.JSON200 + if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil { + t.Errorf("did not get expected metadata, diffs %s", diffs) + } + }) } - }) + } t.Run("update metadata not found", func(t *testing.T) { const objPath = "foo/not/found/bar" diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 1ffd22c5eeb..69fa1a1c452 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -1000,6 +1000,39 @@ func (c *Catalog) GetEntry(ctx context.Context, repositoryID string, reference s return &catalogEntry, nil } +// UpdateEntryUserMetadata updates user metadata for the current entry for a +// path in repository branch reference. +func (c *Catalog) UpdateEntryUserMetadata(ctx context.Context, repositoryID, branch, path string, newUserMetadata map[string]string) error { + branchID := graveler.BranchID(branch) + if err := validator.Validate([]validator.ValidateArg{ + {Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID}, + {Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID}, + {Name: "path", Value: Path(path), Fn: ValidatePath}, + }); err != nil { + return err + } + + repository, err := c.getRepository(ctx, repositoryID) + if err != nil { + return nil + } + + key := graveler.Key(path) + updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) { + if value == nil { + return nil, fmt.Errorf("update user metadata on %s/%s/%s: %w", + repositoryID, branchID, path, graveler.ErrNotFound) + } + entry, err := ValueToEntry(value) + if err != nil { + return nil, err + } + entry.Metadata = newUserMetadata + return EntryToValue(entry) + }) + return c.Store.Update(ctx, repository, branchID, key, updater) +} + func newEntryFromCatalogEntry(entry DBEntry) *Entry { ent := &Entry{ Address: entry.PhysicalAddress, diff --git a/pkg/catalog/fake_graveler_test.go b/pkg/catalog/fake_graveler_test.go index 22f8b28ce3b..75569bc260e 100644 --- a/pkg/catalog/fake_graveler_test.go +++ b/pkg/catalog/fake_graveler_test.go @@ -116,6 +116,19 @@ func (g *FakeGraveler) Set(_ context.Context, repository *graveler.RepositoryRec return nil } +func (g *FakeGraveler) Update(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, update graveler.ValueUpdateFunc, opts ...graveler.SetOptionsFunc) error { + if g.Err != nil { + return g.Err + } + k := fakeGravelerBuildKey(repository.RepositoryID, graveler.Ref(branchID.String()), key) + value, err := update(g.KeyValue[k]) + if err != nil { + return err + } + g.KeyValue[k] = value + return nil +} + func (g *FakeGraveler) Delete(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, _ ...graveler.SetOptionsFunc) error { return nil } diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index 2b8d9af30b7..3edeacb68f8 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -541,6 +541,10 @@ type KeyValueStore interface { // Set stores value on repository / branch by key. nil value is a valid value for tombstone Set(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, value Value, opts ...SetOptionsFunc) error + // Update atomically runs update on repository / branch by key. (Of course, if entry + // is only on committed, the updated entry will still be created (atomically) on staging.) + Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error + // Delete value from repository / branch by key Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error @@ -1793,8 +1797,7 @@ func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branch } // safeBranchWrite repeatedly attempts to perform stagingOperation, retrying -// if the staging token changes during the write. It never backs off. It -// returns the number of times it tried -- between 1 and options.MaxTries. +// if the staging token changes during the write. It never backs off. func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repository *RepositoryRecord, branchID BranchID, options safeBranchWriteOptions, stagingOperation func(branch *Branch) error, operation string, ) error { @@ -1841,6 +1844,47 @@ func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repo return nil } +func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error { + isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE) + if err != nil { + return err + } + if isProtected { + return ErrWriteToProtectedBranch + } + + options := NewSetOptions(opts) + if repository.ReadOnly && !options.Force { + return ErrReadOnlyRepository + } + + log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"}) + + // committedValue, if non-nil is a value read from either uncommitted or committed. Usually + // it is read from committed. If there is a value on staging, that entry will be modified + // and committedValue will never be read. + var committedValue *Value + + err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { + return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { + if currentValue == nil { + // Object not on staging: need to update committed value. + if committedValue == nil { + committedValue, err = g.Get(ctx, repository, Ref(branchID), key) + if err != nil { + // (Includes ErrNotFound) + return nil, fmt.Errorf("read from committed: %w", err) + } + } + // Get always returns a non-nil value or an error. + currentValue = committedValue + } + return update(currentValue) + }) + }, "update_metadata") + return err +} + func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error { isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE) if err != nil { diff --git a/pkg/graveler/mock/graveler.go b/pkg/graveler/mock/graveler.go index c80230b11be..dbd2c30d1d3 100644 --- a/pkg/graveler/mock/graveler.go +++ b/pkg/graveler/mock/graveler.go @@ -158,6 +158,25 @@ func (mr *MockKeyValueStoreMockRecorder) Set(ctx, repository, branchID, key, val return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockKeyValueStore)(nil).Set), varargs...) } +// Update mocks base method. +func (m *MockKeyValueStore) Update(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, update graveler.ValueUpdateFunc, opts ...graveler.SetOptionsFunc) error { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, repository, branchID, key, update} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Update", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockKeyValueStoreMockRecorder) Update(ctx, repository, branchID, key, update interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, repository, branchID, key, update}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockKeyValueStore)(nil).Update), varargs...) +} + // MockVersionController is a mock of VersionController interface. type MockVersionController struct { ctrl *gomock.Controller