-
Notifications
You must be signed in to change notification settings - Fork 350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make update object metadata API atomic #8264
Changes from all commits
1950810
a15b37f
16e5da2
c546a0d
e15e7f8
0d25da1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1000,6 +1000,39 @@ func (c *Catalog) GetEntry(ctx context.Context, repositoryID string, reference s | |
return &catalogEntry, nil | ||
} | ||
|
||
// UpdateEntryUserMetadata updates user metadata for the current entry for a | ||
// path in repository branch reference. | ||
func (c *Catalog) UpdateEntryUserMetadata(ctx context.Context, repositoryID, branch, path string, newUserMetadata map[string]string) error { | ||
branchID := graveler.BranchID(branch) | ||
if err := validator.Validate([]validator.ValidateArg{ | ||
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID}, | ||
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID}, | ||
{Name: "path", Value: Path(path), Fn: ValidatePath}, | ||
}); err != nil { | ||
return err | ||
} | ||
|
||
repository, err := c.getRepository(ctx, repositoryID) | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
key := graveler.Key(path) | ||
updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I have forgotten to submit this comment in the previous review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately graveler cannot run ValueToEntry - graveler is not supposed to know what type of values it handles. Graveler.UpdateObject, as it is now called, indeed uses |
||
if value == nil { | ||
return nil, fmt.Errorf("update user metadata on %s/%s/%s: %w", | ||
repositoryID, branchID, path, graveler.ErrNotFound) | ||
} | ||
entry, err := ValueToEntry(value) | ||
if err != nil { | ||
return nil, err | ||
} | ||
entry.Metadata = newUserMetadata | ||
return EntryToValue(entry) | ||
}) | ||
return c.Store.Update(ctx, repository, branchID, key, updater) | ||
} | ||
|
||
func newEntryFromCatalogEntry(entry DBEntry) *Entry { | ||
ent := &Entry{ | ||
Address: entry.PhysicalAddress, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -541,6 +541,10 @@ type KeyValueStore interface { | |||||||||||
// Set stores value on repository / branch by key. nil value is a valid value for tombstone | ||||||||||||
Set(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, value Value, opts ...SetOptionsFunc) error | ||||||||||||
|
||||||||||||
// Update atomically runs update on repository / branch by key. (Of course, if entry | ||||||||||||
// is only on committed, the updated entry will still be created (atomically) on staging.) | ||||||||||||
Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error | ||||||||||||
|
||||||||||||
// Delete value from repository / branch by key | ||||||||||||
Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error | ||||||||||||
|
||||||||||||
|
@@ -1793,8 +1797,7 @@ func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branch | |||||||||||
} | ||||||||||||
|
||||||||||||
// safeBranchWrite repeatedly attempts to perform stagingOperation, retrying | ||||||||||||
// if the staging token changes during the write. It never backs off. It | ||||||||||||
// returns the number of times it tried -- between 1 and options.MaxTries. | ||||||||||||
// if the staging token changes during the write. It never backs off. | ||||||||||||
func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repository *RepositoryRecord, branchID BranchID, | ||||||||||||
options safeBranchWriteOptions, stagingOperation func(branch *Branch) error, operation string, | ||||||||||||
) error { | ||||||||||||
|
@@ -1841,6 +1844,47 @@ func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repo | |||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙏🏽 👍🏽 |
||||||||||||
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
if isProtected { | ||||||||||||
return ErrWriteToProtectedBranch | ||||||||||||
} | ||||||||||||
|
||||||||||||
options := NewSetOptions(opts) | ||||||||||||
if repository.ReadOnly && !options.Force { | ||||||||||||
return ErrReadOnlyRepository | ||||||||||||
} | ||||||||||||
|
||||||||||||
log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"}) | ||||||||||||
|
||||||||||||
// committedValue, if non-nil is a value read from either uncommitted or committed. Usually | ||||||||||||
// it is read from committed. If there is a value on staging, that entry will be modified | ||||||||||||
// and committedValue will never be read. | ||||||||||||
var committedValue *Value | ||||||||||||
|
||||||||||||
err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { | ||||||||||||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you shouldn't retry if the entry was changed, i.e. the StagingManager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If staging operation fails (i.e. predicate failed) there will not be any retry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. The retry is in case the branch head changes (same as in Set). In that case, we must retry: the update might have succeeded on staging after the commit scanned the key. If we return and succeed, that would be a lost update. So we try again; this might cause staging to have an equal value to committed or not - and both cases are good. |
||||||||||||
if currentValue == nil { | ||||||||||||
// Object not on staging: need to update committed value. | ||||||||||||
if committedValue == nil { | ||||||||||||
Comment on lines
+1870
to
+1872
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not done, this changes behaviour. The current code reads the committed value once, if it needs to, and then always uses that value for an update if the value is not on staging. For correctness, it is enough to use a committed value (from after the start of the operation). For efficiency, we want to read that committed value as few times as possible. This will be 0 times if the object exists on staging, so we read it the first time we need it. |
||||||||||||
committedValue, err = g.Get(ctx, repository, Ref(branchID), key) | ||||||||||||
if err != nil { | ||||||||||||
// (Includes ErrNotFound) | ||||||||||||
return nil, fmt.Errorf("read from committed: %w", err) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
// Get always returns a non-nil value or an error. | ||||||||||||
currentValue = committedValue | ||||||||||||
} | ||||||||||||
return update(currentValue) | ||||||||||||
}) | ||||||||||||
}, "update_metadata") | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, opts ...SetOptionsFunc) error { | ||||||||||||
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE) | ||||||||||||
if err != nil { | ||||||||||||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-)
As you can imagine, I added the loops only after writing the third of the four tests 🤡 .