-
Notifications
You must be signed in to change notification settings - Fork 350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make update object metadata API atomic #8264
Conversation
🎊 PR Preview bb6c444 has been successfully built and deployed to https://treeverse-lakeFS-preview-pr-8264.surge.sh 🕐 Build time: 0.01s 🤖 By surge-preview |
Reviewers: I know that tests fail. Please review anyway, we will likely need >1 round -- and these will be easy enough to fix, and not in the areas modified in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
LookED good to Nir 😭 |
Protect updating object metadata on an existing entry to be safe against concurrent modifications, deletions, and commits and merges to the branch. This is a bit tricky because it needs to work for both committed and staged objects. Fixes #8262 - a race that would give an odd result if update object metadata managed to lose against concurrent delete and uncommitted GC, or potentially merges into the branch.
Allow tests to pass. (Also rebase on trunk latest)
bb6c444
to
16e5da2
Compare
I'll review it today 😃 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - looks neat.
I did not review the tests yet as I had some questions regarding the logic itself
pkg/graveler/graveler.go
Outdated
// getFromBranchWithStaging returns the value for key on repository and | ||
// branch. It returns true if the object was found in the staging area, | ||
// otherwise false. It does not micro-batch. | ||
func (g *Graveler) getFromBranchWithStaging(ctx context.Context, repository *RepositoryRecord, branch *Branch, key Key) (*Value, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this function different from g.Get
(which is used today by Set
and resetKey
) and can we reuse this logic there if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! Actually below I want to do something else on a compacted metarange. So I removed this function in favour of getFromStaging below, and added a test.
pkg/graveler/graveler.go
Outdated
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||
if currentValue == nil { | ||
if onStaging { | ||
// Object previously on staging, and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
I know this is your line length configuration but this way of breaking the comment makes it very confusing to understand. Consider leaving it in a single line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Now I find it harder to read, but I don't get to review my own code :-/
pkg/graveler/graveler.go
Outdated
|
||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||
if currentValue == nil { | ||
if onStaging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this piece of code?
Considering this will just fail during the update due to the change on the branch?
This can also result in inconsistent behavior:
If we catch the deletion before this part we will return ErrNotFound
but if the delete occurs after this check we will get PreconditionFalied
.
This might also cause confusion when debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent question!
Considering this will just fail during the update due to the change on the branch?
Yes but: returning ErrNotFound here will immediately fail safeBranchWrite and return the same error. This is correct behaviour.
I am not sure how this ever returns PreconditionFailed TBH.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well due to the code changes it is not relevant but, consider the two timings:
- Object is deleted from staging before this line - we will return ErrNotFound
- Object is initially read (with pred) at the beginning of
g.StagingManager.Update
then immediately deleted and we use the setMsgIf using the pred - and we getErrPredicateFailed
In both scenarios the object was deleted in staging during UpdateMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I think that you were right and still are, and that this still happens. But only during a race. It is legit to fail an update if it occurs concurrently with a deletion. Returning ErrNotFound is the best way, which serializes DELETE before UPDATE. But returning ErrPredicateFailed is equally valid - it indicates that Graveler was unable to serialize the operations.
I doubt this will matter in practice. If it does, we will need to add a retry.
pkg/graveler/graveler.go
Outdated
} | ||
// Use committed value (in practice it came | ||
// from committed, so it is non-nil). | ||
currentValue = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the only reason we get the value before the update is because we perform retries. In that case I think it's important to document that so it will be clear what we're doing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, so I decided to remove this prefetch entirely in favour of a single fetch-from-committed. That means I don't need to protect pick "the right" value, I can just use the latest. Changed to do that, and I think the new code is much easier to read!
I think the version you did not like would not work with a compacted base metarange. Currently we never set a compacted base metarange, so this was not tested before and is still not tested.
I like that the new code has fewer comments, I think it doesn't need them (diff stats are +15 -42
). Please let me know if you think that more comments are needed :-/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THANKS for a thorough review! PTAL, I think I changed the bits you did not like. It is definitely shorter now; this implies fewer bugs if we assume equal bug densities.
pkg/graveler/graveler.go
Outdated
// getFromBranchWithStaging returns the value for key on repository and | ||
// branch. It returns true if the object was found in the staging area, | ||
// otherwise false. It does not micro-batch. | ||
func (g *Graveler) getFromBranchWithStaging(ctx context.Context, repository *RepositoryRecord, branch *Branch, key Key) (*Value, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! Actually below I want to do something else on a compacted metarange. So I removed this function in favour of getFromStaging below, and added a test.
pkg/graveler/graveler.go
Outdated
|
||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||
if currentValue == nil { | ||
if onStaging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent question!
Considering this will just fail during the update due to the change on the branch?
Yes but: returning ErrNotFound here will immediately fail safeBranchWrite and return the same error. This is correct behaviour.
I am not sure how this ever returns PreconditionFailed TBH.
pkg/graveler/graveler.go
Outdated
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||
if currentValue == nil { | ||
if onStaging { | ||
// Object previously on staging, and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Now I find it harder to read, but I don't get to review my own code :-/
pkg/graveler/graveler.go
Outdated
} | ||
// Use committed value (in practice it came | ||
// from committed, so it is non-nil). | ||
currentValue = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, so I decided to remove this prefetch entirely in favour of a single fetch-from-committed. That means I don't need to protect pick "the right" value, I can just use the latest. Changed to do that, and I think the new code is much easier to read!
I think the version you did not like would not work with a compacted base metarange. Currently we never set a compacted base metarange, so this was not tested before and is still not tested.
I like that the new code has fewer comments, I think it doesn't need them (diff stats are +15 -42
). Please let me know if you think that more comments are needed :-/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good - thank you! Requesting changes due to the entry update failure on predicate failure
pkg/graveler/graveler.go
Outdated
// UpdateObjectMetadata atomically runs update on repository / | ||
// branch by key. (Of course, if entry is only on committed, the | ||
// updated entry will still be created (atomically) on staging.) | ||
UpdateObjectMetadata(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if you meant to write it this way, but you can update more than just the metadata. If it's intended, let's rename the func. If not, let's rewrite the logic.
UpdateObjectMetadata(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error | |
UpdateObject(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it can update only object metadata - not object data. The name UpdateObject
makes it seem like it can do more. OTOH, the parameters make it clear exactly what it can do... so done.
Checksum: "this_is_a_checksum", | ||
} | ||
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry)) | ||
bools := []bool{false, true} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-)
As you can imagine, I added the loops only after writing the third of the four tests 🤡 .
var committedValue *Value | ||
|
||
err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { | ||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you shouldn't retry if the entry was changed, i.e. the StagingManager SetMsgIf
failed due to the predicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If staging operation fails (i.e. predicate failed) there will not be any retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. The retry is in case the branch head changes (same as in Set). In that case, we must retry: the update might have succeeded on staging after the commit scanned the key. If we return and succeed, that would be a lost update. So we try again; this might cause staging to have an equal value to committed or not - and both cases are good.
} | ||
|
||
key := graveler.Key(path) | ||
updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have forgotten to submit this comment in the previous review.
It also relates to @itaiad200 comment:
If the intention of the function is to update the metadata only - let's pass the metadata instead of an update function. make it explicit, inline and easier to read.
Otherwise lets modify the graveler function to be UpdateObject
and provide the update function accordingly (which is infact exactly the same as calling g.StagingManager.Update
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately graveler cannot run ValueToEntry - graveler is not supposed to know what type of values it handles.
Graveler.UpdateObject, as it is now called, indeed uses g.StagingManager.Update
. But it cannot use just that! It needs to be able to update (in staging after the operation) a committed object (not in staging before the operation).
var committedValue *Value | ||
|
||
err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { | ||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If staging operation fails (i.e. predicate failed) there will not be any retry
if currentValue == nil { | ||
// Object not on staging: need to update committed value. | ||
if committedValue == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if currentValue == nil { | |
// Object not on staging: need to update committed value. | |
if committedValue == nil { | |
if currentValue == nil && committedValue == nil { | |
// Object not on staging: need to update committed value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done, this changes behaviour. The current code reads the committed value once, if it needs to, and then always uses that value for an update if the value is not on staging.
For correctness, it is enough to use a committed value (from after the start of the operation). For efficiency, we want to read that committed value as few times as possible. This will be 0 times if the object exists on staging, so we read it the first time we need it.
pkg/graveler/graveler.go
Outdated
|
||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { | ||
if currentValue == nil { | ||
if onStaging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well due to the code changes it is not relevant but, consider the two timings:
- Object is deleted from staging before this line - we will return ErrNotFound
- Object is initially read (with pred) at the beginning of
g.StagingManager.Update
then immediately deleted and we use the setMsgIf using the pred - and we getErrPredicateFailed
In both scenarios the object was deleted in staging during UpdateMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code is much clearer now IMO!
Added one more major comment which I seem to have left out of the previous review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I ended up accepting comments about naming and error messages, but not changing the interface, using StagingManager.Update directly, or changing the interface of Graveler.Update. I hope I explained why in my comments.
PTAL...
pkg/graveler/graveler.go
Outdated
// UpdateObjectMetadata atomically runs update on repository / | ||
// branch by key. (Of course, if entry is only on committed, the | ||
// updated entry will still be created (atomically) on staging.) | ||
UpdateObjectMetadata(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it can update only object metadata - not object data. The name UpdateObject
makes it seem like it can do more. OTOH, the parameters make it clear exactly what it can do... so done.
var committedValue *Value | ||
|
||
err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { | ||
return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. The retry is in case the branch head changes (same as in Set). In that case, we must retry: the update might have succeeded on staging after the commit scanned the key. If we return and succeed, that would be a lost update. So we try again; this might cause staging to have an equal value to committed or not - and both cases are good.
if currentValue == nil { | ||
// Object not on staging: need to update committed value. | ||
if committedValue == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done, this changes behaviour. The current code reads the committed value once, if it needs to, and then always uses that value for an update if the value is not on staging.
For correctness, it is enough to use a committed value (from after the start of the operation). For efficiency, we want to read that committed value as few times as possible. This will be 0 times if the object exists on staging, so we read it the first time we need it.
Checksum: "this_is_a_checksum", | ||
} | ||
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, "main", entry)) | ||
bools := []bool{false, true} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-)
As you can imagine, I added the loops only after writing the third of the four tests 🤡 .
} | ||
|
||
key := graveler.Key(path) | ||
updater := graveler.ValueUpdateFunc(func(value *graveler.Value) (*graveler.Value, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately graveler cannot run ValueToEntry - graveler is not supposed to know what type of values it handles.
Graveler.UpdateObject, as it is now called, indeed uses g.StagingManager.Update
. But it cannot use just that! It needs to be able to update (in staging after the operation) a committed object (not in staging before the operation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the quick fixes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!!!!
One typo comment
pkg/graveler/graveler.go
Outdated
log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"}) | ||
|
||
// committedValue, if non-nil is a value read from either uncommitted or committed. Usually | ||
// it it is read from committed. If there is a value on staging, that entry will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// it it is read from committed. If there is a value on staging, that entry will be | |
// it is read from committed. If there is a value on staging, that entry will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1841,6 +1844,47 @@ func (g *Graveler) safeBranchWrite(ctx context.Context, log logging.Logger, repo | |||
return nil | |||
} | |||
|
|||
func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, update ValueUpdateFunc, opts ...SetOptionsFunc) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏🏽 👍🏽
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Pulling...
pkg/graveler/graveler.go
Outdated
log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"}) | ||
|
||
// committedValue, if non-nil is a value read from either uncommitted or committed. Usually | ||
// it it is read from committed. If there is a value on staging, that entry will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protect updating object metadata on an existing entry to be safe against concurrent modifications, deletions, and commits and merges to the branch. This is a bit tricky because it needs to work for both committed and staged objects.
Fixes #8262 - a race that would give an odd result if update object metadata managed to lose against concurrent delete and uncommitted GC, or potentially merges into the branch.