Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make update object metadata API atomic #8264

Merged
merged 6 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4701,16 +4701,8 @@ func (c *Controller) UpdateObjectUserMetadata(w http.ResponseWriter, r *http.Req
ctx := r.Context()
c.LogAction(ctx, "update_object_user_metadata", r, repository, branch, "")

// read all the _other_ metadata. Does not require checking read
// permissions, as the caller will never see this.
entry, err := c.Catalog.GetEntry(ctx, repository, branch, params.Path, catalog.GetEntryParams{})
if c.handleAPIError(ctx, w, r, err) {
return
}

entry.Metadata = catalog.Metadata(body.Set.AdditionalProperties)

err = c.Catalog.CreateEntry(ctx, repository, branch, *entry)
newUserMetadata := body.Set.AdditionalProperties
err := c.Catalog.UpdateEntryUserMetadata(ctx, repository, branch, params.Path, newUserMetadata)
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down
96 changes: 61 additions & 35 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,47 +2299,73 @@ func TestController_UpdateObjectUserMetadataHander(t *testing.T) {
t.Fatal(err)
}

t.Run("update metadata", func(t *testing.T) {
const objPath = "foo/bar"
entry := catalog.DBEntry{
Path: objPath,
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
}
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry))
bools := []bool{false, true}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-)

As you can imagine, I added the loops only after writing the third of the four tests 🤡 .


userMetadataMap := map[string]string{
"foo": "bar",
"baz": "quux",
for _, doCommit := range bools {
commitLabel := "no commit"
if doCommit {
commitLabel = "commit"
}
for _, doSetMetadata := range bools {
metadataLabel := "no metadata"
if doSetMetadata {
metadataLabel = "initial metadata"
}

body := apigen.UpdateObjectUserMetadataJSONRequestBody{
Set: apigen.ObjectUserMetadata{
AdditionalProperties: userMetadataMap,
},
}
label := fmt.Sprintf("%s, %s", commitLabel, metadataLabel)
t.Run(label, func(t *testing.T) {
const objPath = "foo/bar"
entry := catalog.DBEntry{
Path: objPath,
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
}
if doSetMetadata {
entry.Metadata = catalog.Metadata{
"old": "metadata",
}
}
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry))

resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main",
&apigen.UpdateObjectUserMetadataParams{Path: objPath},
body,
)
verifyResponseOK(t, resp, err)
if doCommit {
_, err := deps.catalog.Commit(ctx, repo, "main", "First commit!", t.Name(), nil, nil, nil, false)
testutil.MustDo(t, "Commit", err)
}

// Verify that it was set
statResp, err := clt.StatObjectWithResponse(ctx, repo, "main",
&apigen.StatObjectParams{
Path: objPath,
UserMetadata: swag.Bool(true),
},
)
verifyResponseOK(t, statResp, err)
objectStats := statResp.JSON200
if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil {
t.Errorf("did not get expected metadata, diffs %s", diffs)
userMetadataMap := map[string]string{
"foo": "bar",
"baz": "quux",
}

body := apigen.UpdateObjectUserMetadataJSONRequestBody{
Set: apigen.ObjectUserMetadata{
AdditionalProperties: userMetadataMap,
},
}

resp, err := clt.UpdateObjectUserMetadataWithResponse(ctx, repo, "main",
&apigen.UpdateObjectUserMetadataParams{Path: objPath},
body,
)
verifyResponseOK(t, resp, err)

// Verify that it was set
statResp, err := clt.StatObjectWithResponse(ctx, repo, "main",
&apigen.StatObjectParams{
Path: objPath,
UserMetadata: swag.Bool(true),
},
)
verifyResponseOK(t, statResp, err)
objectStats := statResp.JSON200
if diffs := deep.Equal(objectStats.Metadata.AdditionalProperties, userMetadataMap); diffs != nil {
t.Errorf("did not get expected metadata, diffs %s", diffs)
}
})
}
})
}

t.Run("update metadata not found", func(t *testing.T) {
const objPath = "foo/not/found/bar"
Expand Down
33 changes: 33 additions & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,39 @@ func (c *Catalog) GetEntry(ctx context.Context, repositoryID string, reference s
return &catalogEntry, nil
}

// UpdateEntryUserMetadata updates user metadata for the current entry for a
// path in repository branch reference.
func (c *Catalog) UpdateEntryUserMetadata(ctx context.Context, repositoryID, branch, path string, newUserMetadata map[string]string) error {
branchID := graveler.BranchID(branch)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
{Name: "path", Value: Path(path), Fn: ValidatePath},
}); err != nil {
return err
}

repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return nil
}

key := graveler.Key(path)
updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have forgotten to submit this comment in the previous review.
It also relates to @itaiad200 comment:
If the intention of the function is to update the metadata only - let's pass the metadata instead of an update function. make it explicit, inline and easier to read.
Otherwise lets modify the graveler function to be UpdateObject and provide the update function accordingly (which is infact exactly the same as calling g.StagingManager.Update).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately graveler cannot run ValueToEntry - graveler is not supposed to know what type of values it handles.

Graveler.UpdateObject, as it is now called, indeed uses g.StagingManager.Update. But it cannot use just that! It needs to be able to update (in staging after the operation) a committed object (not in staging before the operation).

if value == nil {
return nil, fmt.Errorf("update user metadata on %s/%s/%s: %w",
repositoryID, branchID, path, graveler.ErrNotFound)
}
entry, err := ValueToEntry(value)
if err != nil {
return nil, err
}
entry.Metadata = newUserMetadata
return EntryToValue(entry)
})
return c.Store.Update(ctx, repository, branchID, key, updater)
}

func newEntryFromCatalogEntry(entry DBEntry) *Entry {
ent := &Entry{
Address: entry.PhysicalAddress,
Expand Down
13 changes: 13 additions & 0 deletions pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ func (g *FakeGraveler) Set(_ context.Context, repository *graveler.RepositoryRec
return nil
}

func (g *FakeGraveler) Update(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, update graveler.ValueUpdateFunc, opts ...graveler.SetOptionsFunc) error {
if g.Err != nil {
return g.Err
}
k := fakeGravelerBuildKey(repository.RepositoryID, graveler.Ref(branchID.String()), key)
value, err := update(g.KeyValue[k])
if err != nil {
return err
}
g.KeyValue[k] = value
return nil
}

func (g *FakeGraveler) Delete(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, _ ...graveler.SetOptionsFunc) error {
return nil
}
Expand Down
48 changes: 46 additions & 2 deletions pkg/graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ type KeyValueStore interface {
// Set stores value on repository / branch by key. nil value is a valid value for tombstone
Set(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, value Value, opts ...SetOptionsFunc) error

// Update atomically runs update on repository / branch by key. (Of course, if entry
// is only on committed, the updated entry will still be created (atomically) on staging.)
Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error

// Delete value from repository / branch by key
Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error

Expand Down Expand Up @@ -1793,8 +1797,7 @@ func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branch
}

// safeBranchWrite repeatedly attempts to perform stagingOperation, retrying
// if the staging token changes during the write. It never backs off. It
// returns the number of times it tried -- between 1 and options.MaxTries.
// if the staging token changes during the write. It never backs off.
func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repository *RepositoryRecord, branchID BranchID,
options safeBranchWriteOptions, stagingOperation func(branch *Branch) error, operation string,
) error {
Expand Down Expand Up @@ -1841,6 +1844,47 @@ func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repo
return nil
}

func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏🏽 👍🏽

isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE)
if err != nil {
return err
}
if isProtected {
return ErrWriteToProtectedBranch
}

options := NewSetOptions(opts)
if repository.ReadOnly && !options.Force {
return ErrReadOnlyRepository
}

log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"})

// committedValue, if non-nil is a value read from either uncommitted or committed. Usually
// it is read from committed. If there is a value on staging, that entry will be modified
// and committedValue will never be read.
var committedValue *Value

err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error {
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you shouldn't retry if the entry was changed, i.e. the StagingManager SetMsgIf failed due to the predicate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If staging operation fails (i.e. predicate failed) there will not be any retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The retry is in case the branch head changes (same as in Set). In that case, we must retry: the update might have succeeded on staging after the commit scanned the key. If we return and succeed, that would be a lost update. So we try again; this might cause staging to have an equal value to committed or not - and both cases are good.

if currentValue == nil {
// Object not on staging: need to update committed value.
if committedValue == nil {
Comment on lines +1870 to +1872
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if currentValue == nil {
// Object not on staging: need to update committed value.
if committedValue == nil {
if currentValue == nil && committedValue == nil {
// Object not on staging: need to update committed value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done, this changes behaviour. The current code reads the committed value once, if it needs to, and then always uses that value for an update if the value is not on staging.

For correctness, it is enough to use a committed value (from after the start of the operation). For efficiency, we want to read that committed value as few times as possible. This will be 0 times if the object exists on staging, so we read it the first time we need it.

committedValue, err = g.Get(ctx, repository, Ref(branchID), key)
if err != nil {
// (Includes ErrNotFound)
return nil, fmt.Errorf("read from committed: %w", err)
}
}
// Get always returns a non-nil value or an error.
currentValue = committedValue
}
return update(currentValue)
})
}, "update_metadata")
return err
}

func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error {
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/graveler/mock/graveler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading