diff --git a/pkg/graveler/retention/active_commits.go b/pkg/graveler/retention/active_commits.go index 24c5e32454b..60824b7b7f4 100644 --- a/pkg/graveler/retention/active_commits.go +++ b/pkg/graveler/retention/active_commits.go @@ -2,11 +2,11 @@ package retention import ( "context" - "errors" "fmt" "time" "github.com/treeverse/lakefs/pkg/graveler" + "github.com/treeverse/lakefs/pkg/logging" ) type CommitNode struct { @@ -15,52 +15,115 @@ type CommitNode struct { MetaRangeID graveler.MetaRangeID } -func NewCommitNode(creationDate time.Time, mainParent graveler.CommitID, metaRangeID graveler.MetaRangeID) CommitNode { +// CommitsMap is an immutable cache of commits. Each commit can be set +// once, and will be read if needed. It is *not* thread-safe. +type CommitsMap struct { + ctx context.Context + Log logging.Logger + NumMisses int64 + CommitGetter RepositoryCommitGetter + Map map[graveler.CommitID]CommitNode +} + +func NewCommitsMap(ctx context.Context, commitGetter RepositoryCommitGetter) (CommitsMap, error) { + initialMap := make(map[graveler.CommitID]CommitNode) + it, err := commitGetter.List(ctx) + if err != nil { + return CommitsMap{}, fmt.Errorf("list existing commits into map: %w", err) + } + defer it.Close() + for it.Next() { + commit := it.Value() + initialMap[commit.CommitID] = nodeFromCommit(commit.Commit) + } + return CommitsMap{ + ctx: ctx, + Log: logging.FromContext(ctx), + NumMisses: int64(0), + CommitGetter: commitGetter, + Map: initialMap, + }, nil +} + +// Set sets a commit. It will not be looked up again in CommitGetter. +func (c *CommitsMap) Set(id graveler.CommitID, node CommitNode) { + c.Map[id] = node +} + +// Get gets a commit. If the commit has not been Set it uses CommitGetter +// to read it. +func (c *CommitsMap) Get(id graveler.CommitID) (CommitNode, error) { + ret, ok := c.Map[id] + if ok { + return ret, nil + } + // Unlikely: id raced with initial bunch of Sets. + commit, err := c.CommitGetter.Get(c.ctx, id) + if err != nil { + return CommitNode{}, fmt.Errorf("get missing commit ID %s: %w", id, err) + } + ret = nodeFromCommit(commit) + c.Map[id] = ret + c.NumMisses++ + c.Log.WithFields(logging.Fields{ + "commit_id": id, + "created": ret.CreationDate, + "age": time.Since(ret.CreationDate), + }).Warn("Loaded single commit, probably new") + return ret, nil +} + +// GetMap returns the entire map of commits. It is probably incorrect to +// modify it. +func (c *CommitsMap) GetMap() map[graveler.CommitID]CommitNode { + return c.Map +} + +// nodeFromCommit returns a new CommitNode for a Commit. +func nodeFromCommit(commit *graveler.Commit) CommitNode { + var mainParent graveler.CommitID + if len(commit.Parents) > 0 { + // every branch retains only its main ancestry, acquired by recursively taking the first parent: + mainParent = commit.Parents[0] + if commit.Version < graveler.CommitVersionParentSwitch { + mainParent = commit.Parents[len(commit.Parents)-1] + } + } return CommitNode{ - CreationDate: creationDate, + CreationDate: commit.CreationDate, MainParent: mainParent, - MetaRangeID: metaRangeID, + MetaRangeID: commit.MetaRangeID, } } -var ErrCommitNotFound = errors.New("commit not found") - // GetGarbageCollectionCommits returns the sets of active commits, according to the repository's garbage collection rules. // See https://github.com/treeverse/lakeFS/issues/1932 for more details. // Upon completion, the given startingPointIterator is closed. -func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCStartingPointIterator, commitGetter *RepositoryCommitGetter, rules *graveler.GarbageCollectionRules) (map[graveler.CommitID]graveler.MetaRangeID, error) { +func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCStartingPointIterator, commitGetter RepositoryCommitGetter, rules *graveler.GarbageCollectionRules) (map[graveler.CommitID]graveler.MetaRangeID, error) { // From each starting point in the given startingPointIterator, it iterates through its main ancestry. // All commits reached are added to the active set, until and including the first commit performed before the start of the retention period. processed := make(map[graveler.CommitID]time.Time) activeMap := make(map[graveler.CommitID]struct{}) - commitsIterator, err := commitGetter.ListCommits(ctx) + commitsMap, err := NewCommitsMap(ctx, commitGetter) if err != nil { - return nil, err - } - commitsMap := make(map[graveler.CommitID]CommitNode) - defer commitsIterator.Close() - for commitsIterator.Next() { - commitRecord := commitsIterator.Value() - var mainParent graveler.CommitID - if len(commitRecord.Commit.Parents) > 0 { - // every branch retains only its main ancestry, acquired by recursively taking the first parent: - mainParent = commitRecord.Commit.Parents[0] - if commitRecord.Commit.Version < graveler.CommitVersionParentSwitch { - mainParent = commitRecord.Commit.Parents[len(commitRecord.Commit.Parents)-1] - } - } - commitsMap[commitRecord.CommitID] = NewCommitNode(commitRecord.Commit.CreationDate, mainParent, commitRecord.MetaRangeID) + return nil, fmt.Errorf("initial read commits: %w", err) } + // Observe NumMisses. This should not be a metric unless we see it happen a _lot_. + defer func() { + logging.FromContext(ctx). + WithField("num_misses", commitsMap.NumMisses). + Info("Commits map - misses are due to concurrent commits") + }() now := time.Now() defer startingPointIterator.Close() for startingPointIterator.Next() { startingPoint := startingPointIterator.Value() retentionDays := int(rules.DefaultRetentionDays) - commitNode, ok := commitsMap[startingPoint.CommitID] - if !ok { - return nil, fmt.Errorf("%w: %s", ErrCommitNotFound, startingPoint.CommitID) + commitNode, err := commitsMap.Get(startingPoint.CommitID) + if err != nil { + return nil, fmt.Errorf("%w: %s", err, startingPoint.CommitID) } if startingPoint.BranchID == "" { // If the current commit is NOT a branch HEAD (a dangling commit) - add a hypothetical HEAD as its child @@ -70,8 +133,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS } } else { // If the current commit IS a branch HEAD - fetch and retention rules for this branch and... - var branchRetentionDays int32 - if branchRetentionDays, ok = rules.BranchRetentionDays[string(startingPoint.BranchID)]; ok { + if branchRetentionDays, ok := rules.BranchRetentionDays[string(startingPoint.BranchID)]; ok { retentionDays = int(branchRetentionDays) } activeMap[startingPoint.CommitID] = struct{}{} @@ -86,8 +148,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS // Start traversing the commit's ancestors (path): for commitNode.MainParent != "" { nextCommitID := commitNode.MainParent - var previousThreshold time.Time - if previousThreshold, ok = processed[nextCommitID]; ok && !previousThreshold.After(branchExpirationThreshold) { + if previousThreshold, ok := processed[nextCommitID]; ok && !previousThreshold.After(branchExpirationThreshold) { // If the parent commit was already processed and its threshold was longer than the current threshold, // i.e. the current threshold doesn't hold for it, stop processing it because the other path decision // wins @@ -100,9 +161,9 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS activeMap[nextCommitID] = struct{}{} } // Continue down the rabbit hole. - commitNode, ok = commitsMap[nextCommitID] - if !ok { - return nil, fmt.Errorf("%w: %s", ErrCommitNotFound, nextCommitID) + commitNode, err = commitsMap.Get(nextCommitID) + if err != nil { + return nil, fmt.Errorf("%w: %s", err, nextCommitID) } // Set the parent commit ID's expiration threshold as the current (this is true because this one is the // longest, because we wouldn't have gotten here otherwise) @@ -112,7 +173,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS if startingPointIterator.Err() != nil { return nil, startingPointIterator.Err() } - return makeCommitMap(commitsMap, activeMap), nil + return makeCommitMap(commitsMap.GetMap(), activeMap), nil } func makeCommitMap(commitNodes map[graveler.CommitID]CommitNode, commitSet map[graveler.CommitID]struct{}) map[graveler.CommitID]graveler.MetaRangeID { diff --git a/pkg/graveler/retention/active_commits_test.go b/pkg/graveler/retention/active_commits_test.go index 0708511d06a..46ffd05a5de 100644 --- a/pkg/graveler/retention/active_commits_test.go +++ b/pkg/graveler/retention/active_commits_test.go @@ -2,6 +2,8 @@ package retention import ( "context" + "errors" + "fmt" "sort" "testing" "time" @@ -58,6 +60,111 @@ func findMainAncestryLeaves(now time.Time, heads map[string]int32, commits map[s return res } +// fakeRepositoryCommitGetter is a RepositoryCommitGetter used to test CommitsMap. +type fakeRepositoryCommitGetter struct { + // Commits is the list of pre-existing commits. + Commits []*graveler.CommitRecord + // AnotherCommit, if not nil, is returned from Get (but not List). + // It can only be returned once. + AnotherCommit *graveler.CommitRecord + // GetCalled is true after the first call to Get. + GetCalled bool +} + +var errTooManyGets = errors.New("more than one Get on fakeRepositoryCommitGetter") + +func (c *fakeRepositoryCommitGetter) List(_ context.Context) (graveler.CommitIterator, error) { + return testutil.NewFakeCommitIterator(c.Commits), nil +} + +func (c *fakeRepositoryCommitGetter) Get(_ context.Context, id graveler.CommitID) (*graveler.Commit, error) { + if c.AnotherCommit == nil { + return nil, fmt.Errorf("%w: Get(%s) when no other commits", graveler.ErrNotFound, id) + } + if id != c.AnotherCommit.CommitID { + return nil, fmt.Errorf("%w: Get(%s) when expecting Get(%s)", graveler.ErrNotFound, id, c.AnotherCommit.CommitID) + } + if c.GetCalled { + return nil, fmt.Errorf("%s: %w", id, errTooManyGets) + } + c.GetCalled = true + return c.AnotherCommit.Commit, nil +} + +func TestCommitsMap(t *testing.T) { + cases := []struct { + Name string + CommitGetter *fakeRepositoryCommitGetter + }{ + { + Name: "FromList", + CommitGetter: &fakeRepositoryCommitGetter{ + Commits: []*graveler.CommitRecord{ + { + CommitID: "a", + Commit: &graveler.Commit{ + MetaRangeID: graveler.MetaRangeID("metarange:A"), + }, + }, + { + CommitID: "b", + Commit: &graveler.Commit{ + MetaRangeID: graveler.MetaRangeID("metarange:B"), + }, + }, + }, + AnotherCommit: nil, + }, + }, { + Name: "FromGet", + CommitGetter: &fakeRepositoryCommitGetter{ + Commits: []*graveler.CommitRecord{ + { + CommitID: "a", + Commit: &graveler.Commit{ + MetaRangeID: graveler.MetaRangeID("metarange:A"), + }, + }, + }, + AnotherCommit: &graveler.CommitRecord{ + CommitID: "b", + Commit: &graveler.Commit{ + MetaRangeID: graveler.MetaRangeID("metarange:B"), + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + commitsMap, err := NewCommitsMap(context.Background(), tc.CommitGetter) + if err != nil { + t.Fatal(err) + } + + a, err := commitsMap.Get(graveler.CommitID("a")) + if err != nil { + t.Errorf("Failed to get a: %s", err) + } + if a.MetaRangeID != "metarange:A" { + t.Errorf("Got metarange %s for a, expected \"metarange:A\"", a.MetaRangeID) + } + b, err := commitsMap.Get(graveler.CommitID("b")) + if err != nil { + t.Errorf("Failed to get b: %s", err) + } + if b.MetaRangeID != "metarange:B" { + t.Errorf("Got metarange %s for b, expected \"metarange:B\"", b.MetaRangeID) + } + c, err := commitsMap.Get(graveler.CommitID("c")) + if !errors.Is(err, graveler.ErrNotFound) { + t.Errorf("Got node %+v, error %s for c, expected not found", c.MetaRangeID, err) + } + }) + } +} + func TestActiveCommits(t *testing.T) { tests := map[string]struct { commits map[string]testCommit @@ -264,7 +371,7 @@ func TestActiveCommits(t *testing.T) { gcCommits, err := GetGarbageCollectionCommits(ctx, NewGCStartingPointIterator( testutil.NewFakeCommitIterator(findMainAncestryLeaves(now, tst.headsRetentionDays, tst.commits)), - testutil.NewFakeBranchIterator(branches)), &RepositoryCommitGetter{ + testutil.NewFakeBranchIterator(branches)), &repositoryCommitGetter{ refManager: refManagerMock, repository: repositoryRecord, }, garbageCollectionRules) diff --git a/pkg/graveler/retention/garbage_collection_manager.go b/pkg/graveler/retention/garbage_collection_manager.go index a35722020dc..b61a6253c51 100644 --- a/pkg/graveler/retention/garbage_collection_manager.go +++ b/pkg/graveler/retention/garbage_collection_manager.go @@ -87,15 +87,24 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionUncommitted(ctx context. }, stat.Size(), fd, block.PutOpts{}) } -type RepositoryCommitGetter struct { +type RepositoryCommitGetter interface { + List(ctx context.Context) (graveler.CommitIterator, error) + Get(ctx context.Context, id graveler.CommitID) (*graveler.Commit, error) +} + +type repositoryCommitGetter struct { refManager graveler.RefManager repository *graveler.RepositoryRecord } -func (r *RepositoryCommitGetter) ListCommits(ctx context.Context) (graveler.CommitIterator, error) { +func (r *repositoryCommitGetter) List(ctx context.Context) (graveler.CommitIterator, error) { return r.refManager.ListCommits(ctx, r.repository) } +func (r *repositoryCommitGetter) Get(ctx context.Context, id graveler.CommitID) (*graveler.Commit, error) { + return r.refManager.GetCommit(ctx, r.repository, id) +} + func NewGarbageCollectionManager(blockAdapter block.Adapter, refManager graveler.RefManager, committedBlockStoragePrefix string) *GarbageCollectionManager { return &GarbageCollectionManager{ blockAdapter: blockAdapter, @@ -149,7 +158,7 @@ func (m *GarbageCollectionManager) SaveRules(ctx context.Context, storageNamespa } func (m *GarbageCollectionManager) SaveGarbageCollectionCommits(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.GarbageCollectionRules) (string, error) { - commitGetter := &RepositoryCommitGetter{ + commitGetter := &repositoryCommitGetter{ refManager: m.refManager, repository: repository, }