Skip to content

Commit

Permalink
Expand grants faster. (#255)
Browse files Browse the repository at this point in the history
* Use shorter json tags so we serialize less data when checkpointing. Update grant expansion benchmark to create more groups and return correct data in ListEntitlements and ListGrants.

* Only checkpoint if we haven't checkpointed within the last 10 seconds.

This drastically reduces the amount of time we spend serializing and saving state.

* Optimize case where we've fixed all cycles. Increase circleSize to 10.

* Fix behavior of ListResources() to return resources of specified type.

* Clean up benchmark. Use functions to add users, groups, and group members instead of directly messing with the data.

* Make a loop of parent groups. Reduce circle size to 7 so we don't hit maxDepth in syncer and results are consistent.

* Check if lastCheckPointTime is zero. Oops.

* Enable SQLite write ahead log. This greatly improves performance when writing lots of grants in service mode.
  • Loading branch information
ggreer authored Nov 25, 2024
1 parent 50a56ee commit fe297ca
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/dotc1z/manager/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) {
zap.String("temp_path", l.tmpPath),
)

return dotc1z.NewC1ZFile(ctx, l.tmpPath, dotc1z.WithTmpDir(l.tmpDir))
return dotc1z.NewC1ZFile(ctx, l.tmpPath, dotc1z.WithTmpDir(l.tmpDir), dotc1z.WithPragma("journal_mode", "WAL"))
}

// SaveC1Z saves the C1Z file to the local file system.
Expand Down
7 changes: 7 additions & 0 deletions pkg/sync/expand/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
// exists. Returns nil if no cycle exists. If there is a single
// node pointing to itself, that will count as a cycle.
func (g *EntitlementGraph) GetFirstCycle() []int {
if g.HasNoCycles {
return nil
}
visited := mapset.NewSet[int]()
for nodeID := range g.Nodes {
cycle, hasCycle := g.cycleDetectionHelper(nodeID, visited, []int{})
Expand Down Expand Up @@ -87,8 +90,12 @@ func (g *EntitlementGraph) removeNode(nodeID int) {
// FixCycles if any cycles of nodes exist, merge all nodes in that cycle into a
// single node and then repeat. Iteration ends when there are no more cycles.
func (g *EntitlementGraph) FixCycles() error {
if g.HasNoCycles {
return nil
}
cycle := g.GetFirstCycle()
if cycle == nil {
g.HasNoCycles = true
return nil
}

Expand Down
31 changes: 19 additions & 12 deletions pkg/sync/expand/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@ import (
"go.uber.org/zap"
)

// JSON tags for actions, edges, and nodes are short to minimize size of serialized data when checkpointing

type EntitlementGraphAction struct {
SourceEntitlementID string `json:"source_entitlement_id"`
DescendantEntitlementID string `json:"descendant_entitlement_id"`
Shallow bool `json:"shallow"`
ResourceTypeIDs []string `json:"resource_types_ids"`
PageToken string `json:"page_token"`
SourceEntitlementID string `json:"sid"`
DescendantEntitlementID string `json:"did"`
Shallow bool `json:"s"`
ResourceTypeIDs []string `json:"rtids"`
PageToken string `json:"pt"`
}

type Edge struct {
EdgeID int `json:"edge_id"`
SourceID int `json:"source_id"`
DestinationID int `json:"destination_id"`
IsExpanded bool `json:"expanded"`
IsShallow bool `json:"shallow"`
ResourceTypeIDs []string `json:"resource_type_ids"`
EdgeID int `json:"id"`
SourceID int `json:"sid"`
DestinationID int `json:"did"`
IsExpanded bool `json:"e"`
IsShallow bool `json:"s"`
ResourceTypeIDs []string `json:"rtids"`
}

// Node represents a list of entitlements. It is the base element of the graph.
type Node struct {
Id int `json:"id"`
EntitlementIDs []string `json:"entitlementIds"` // List of entitlements.
EntitlementIDs []string `json:"eids"` // List of entitlements.
}

// EntitlementGraph - a directed graph representing the relationships between
Expand All @@ -47,6 +49,7 @@ type EntitlementGraph struct {
Loaded bool `json:"loaded"`
Depth int `json:"depth"`
Actions []EntitlementGraphAction `json:"actions"`
HasNoCycles bool `json:"has_no_cycles"`
}

func NewEntitlementGraph(_ context.Context) *EntitlementGraph {
Expand All @@ -56,6 +59,7 @@ func NewEntitlementGraph(_ context.Context) *EntitlementGraph {
EntitlementsToNodes: make(map[string]int),
Nodes: make(map[int]Node),
SourcesToDestinations: make(map[int]map[int]int),
HasNoCycles: false,
}
}

Expand Down Expand Up @@ -155,6 +159,7 @@ func (g *EntitlementGraph) AddEntitlement(entitlement *v2.Entitlement) {
if found != nil {
return
}
g.HasNoCycles = false // Reset this since we're changing the graph.

// Start at 1 in case we don't initialize something and try to get node 0.
g.NextNodeID++
Expand Down Expand Up @@ -265,6 +270,8 @@ func (g *EntitlementGraph) AddEdge(
g.DestinationsToSources[dstNode.Id] = make(map[int]int)
}

g.HasNoCycles = false // Reset this since we're changing the graph.

// Start at 1 in case we don't initialize something and try to get edge 0.
g.NextEdgeID++
edge := Edge{
Expand Down
27 changes: 17 additions & 10 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,29 @@ type Syncer interface {

// syncer orchestrates a connector sync and stores the results using the provided datasource.Writer.
type syncer struct {
c1zManager manager.Manager
c1zPath string
store connectorstore.Writer
connector types.ConnectorClient
state State
runDuration time.Duration
transitionHandler func(s Action)
progressHandler func(p *Progress)
tmpDir string
skipFullSync bool
c1zManager manager.Manager
c1zPath string
store connectorstore.Writer
connector types.ConnectorClient
state State
runDuration time.Duration
transitionHandler func(s Action)
progressHandler func(p *Progress)
tmpDir string
skipFullSync bool
lastCheckPointTime time.Time

skipEGForResourceType map[string]bool
}

const minCheckpointInterval = 10 * time.Second

// Checkpoint marshals the current state and stores it.
func (s *syncer) Checkpoint(ctx context.Context) error {
if !s.lastCheckPointTime.IsZero() && time.Since(s.lastCheckPointTime) < minCheckpointInterval {
return nil
}
s.lastCheckPointTime = time.Now()
checkpoint, err := s.state.Marshal()
if err != nil {
return err
Expand Down
181 changes: 111 additions & 70 deletions pkg/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
Expand All @@ -16,88 +17,62 @@ import (
"google.golang.org/grpc"
)

var groupResourceType = &v2.ResourceType{
Id: "group",
DisplayName: "Group",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_GROUP},
}
var userResourceType = &v2.ResourceType{
Id: "user",
DisplayName: "User",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_USER},
Annotations: annotations.New(&v2.SkipEntitlementsAndGrants{}),
}

func BenchmarkExpandCircle(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create a loop of N entitlements
circleSize := 9
circleSize := 7
// with different principal + grants at each layer
usersPerLayer := 100
groupCount := 100

mc := newMockConnector()

groupResourceType := &v2.ResourceType{
Id: "group",
DisplayName: "Group",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_GROUP},
}
userResourceType := &v2.ResourceType{
Id: "user",
DisplayName: "User",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_USER},
Annotations: annotations.New(&v2.SkipEntitlementsAndGrants{}),
}
mc.rtDB = append(mc.rtDB, groupResourceType, userResourceType)

for i := 0; i < circleSize; i++ {
resoruceId := "g_" + strconv.Itoa(i)
resoruce, err := rs.NewGroupResource(
resoruceId,
groupResourceType,
resoruceId,
[]rs.GroupTraitOption{},
)
for i := 0; i < groupCount; i++ {
groupId := "group_" + strconv.Itoa(i)
group, _, err := mc.AddGroup(ctx, groupId)
require.NoError(b, err)

mc.resourceDB = append(mc.resourceDB, resoruce)
childGroupId := "child_group_" + strconv.Itoa(i)
childGroup, childEnt, err := mc.AddGroup(ctx, childGroupId)
require.NoError(b, err)

ent := et.NewAssignmentEntitlement(
resoruce,
"member",
et.WithGrantableTo(groupResourceType, userResourceType),
)
ent.Slug = "member"
_ = mc.AddGroupMember(ctx, group, childGroup, childEnt)

mc.entDB = append(mc.entDB, ent)
for j := 0; j < usersPerLayer; j++ {
pid := "u_" + strconv.Itoa(i*usersPerLayer+j)
principal, err := rs.NewUserResource(
pid,
userResourceType,
pid,
[]rs.UserTraitOption{},
rs.WithAnnotation(&v2.SkipEntitlementsAndGrants{}),
)
pid := "user_" + strconv.Itoa(i*usersPerLayer+j)
principal, err := mc.AddUser(ctx, pid)
require.NoError(b, err)
mc.userDB = append(mc.userDB, principal)

grant := gt.NewGrant(
resoruce,
"member",
principal,
)
mc.grantDB = append(mc.grantDB, grant)

// This isn't needed because grant expansion will create this grant
// _ = mc.AddGroupMember(ctx, group, principal)
_ = mc.AddGroupMember(ctx, childGroup, principal)
}
}

// create the circle
for i := 0; i < circleSize; i++ {
currentEnt := mc.entDB[i]
nextEnt := mc.entDB[(i+1)%circleSize] // Wrap around to the start for the last element

grant := gt.NewGrant(
nextEnt.Resource,
"member",
currentEnt.Resource,
gt.WithAnnotation(&v2.GrantExpandable{
EntitlementIds: []string{
currentEnt.Id,
},
}),
)

mc.grantDB = append(mc.grantDB, grant)
for i := 0; i < circleSize; i += 1 {
groupId := "group_" + strconv.Itoa(i)
nextGroupId := "group_" + strconv.Itoa((i+1)%circleSize) // Wrap around to the start for the last element
currentEnt := mc.entDB[groupId][0]
nextEnt := mc.entDB[nextGroupId][0]

_ = mc.AddGroupMember(ctx, currentEnt.Resource, nextEnt.Resource, nextEnt)
}

tempDir, err := os.MkdirTemp("", "baton-benchmark-expand-circle")
Expand All @@ -120,9 +95,9 @@ func newMockConnector() *mockConnector {
mc := &mockConnector{
rtDB: make([]*v2.ResourceType, 0),
resourceDB: make([]*v2.Resource, 0),
entDB: make([]*v2.Entitlement, 0),
entDB: make(map[string][]*v2.Entitlement),
userDB: make([]*v2.Resource, 0),
grantDB: make([]*v2.Grant, 0),
grantDB: make(map[string][]*v2.Grant),
}
return mc
}
Expand All @@ -131,9 +106,9 @@ type mockConnector struct {
metadata *v2.ConnectorMetadata
rtDB []*v2.ResourceType
resourceDB []*v2.Resource
entDB []*v2.Entitlement
entDB map[string][]*v2.Entitlement // resource id to entitlements
userDB []*v2.Resource
grantDB []*v2.Grant
grantDB map[string][]*v2.Grant // resource id to grants
v2.AssetServiceClient
v2.GrantManagerServiceClient
v2.ResourceManagerServiceClient
Expand All @@ -143,23 +118,89 @@ type mockConnector struct {
v2.TicketsServiceClient
}

func (mc *mockConnector) AddGroup(ctx context.Context, groupId string) (*v2.Resource, *v2.Entitlement, error) {
group, err := rs.NewGroupResource(
groupId,
groupResourceType,
groupId,
[]rs.GroupTraitOption{},
)
if err != nil {
return nil, nil, err
}

mc.resourceDB = append(mc.resourceDB, group)

ent := et.NewAssignmentEntitlement(
group,
"member",
et.WithGrantableTo(groupResourceType, userResourceType),
)
ent.Slug = "member"
mc.entDB[groupId] = append(mc.entDB[groupId], ent)

return group, ent, nil
}

func (mc *mockConnector) AddUser(ctx context.Context, userId string) (*v2.Resource, error) {
user, err := rs.NewUserResource(
userId,
userResourceType,
userId,
[]rs.UserTraitOption{},
rs.WithAnnotation(&v2.SkipEntitlementsAndGrants{}),
)
if err != nil {
return nil, err
}

mc.userDB = append(mc.userDB, user)
return user, nil
}

func (mc *mockConnector) AddGroupMember(ctx context.Context, resource *v2.Resource, principal *v2.Resource, expandEnts ...*v2.Entitlement) *v2.Grant {
grantOpts := []gt.GrantOption{}

for _, ent := range expandEnts {
grantOpts = append(grantOpts, gt.WithAnnotation(&v2.GrantExpandable{
EntitlementIds: []string{
ent.Id,
},
}))
}

grant := gt.NewGrant(
resource,
"member",
principal,
grantOpts...,
)

mc.grantDB[resource.Id.Resource] = append(mc.grantDB[resource.Id.Resource], grant)

return grant
}

func (mc *mockConnector) ListResourceTypes(context.Context, *v2.ResourceTypesServiceListResourceTypesRequest, ...grpc.CallOption) (*v2.ResourceTypesServiceListResourceTypesResponse, error) {
return &v2.ResourceTypesServiceListResourceTypesResponse{List: mc.rtDB}, nil
}

func (mc *mockConnector) ListResources(ctx context.Context, in *v2.ResourcesServiceListResourcesRequest, opts ...grpc.CallOption) (*v2.ResourcesServiceListResourcesResponse, error) {
all := make([]*v2.Resource, 0, len(mc.resourceDB)+len(mc.userDB))
all = append(all, mc.resourceDB...)
all = append(all, mc.userDB...)
return &v2.ResourcesServiceListResourcesResponse{List: all}, nil
if in.ResourceTypeId == "group" {
return &v2.ResourcesServiceListResourcesResponse{List: mc.resourceDB}, nil
}
if in.ResourceTypeId == "user" {
return &v2.ResourcesServiceListResourcesResponse{List: mc.userDB}, nil
}
return nil, fmt.Errorf("unknown resource type %s", in.ResourceTypeId)
}

func (mc *mockConnector) ListEntitlements(ctx context.Context, in *v2.EntitlementsServiceListEntitlementsRequest, opts ...grpc.CallOption) (*v2.EntitlementsServiceListEntitlementsResponse, error) {
return &v2.EntitlementsServiceListEntitlementsResponse{List: mc.entDB}, nil
return &v2.EntitlementsServiceListEntitlementsResponse{List: mc.entDB[in.Resource.Id.Resource]}, nil
}

func (mc *mockConnector) ListGrants(ctx context.Context, in *v2.GrantsServiceListGrantsRequest, opts ...grpc.CallOption) (*v2.GrantsServiceListGrantsResponse, error) {
return &v2.GrantsServiceListGrantsResponse{List: mc.grantDB}, nil
return &v2.GrantsServiceListGrantsResponse{List: mc.grantDB[in.Resource.Id.Resource]}, nil
}

func (mc *mockConnector) GetMetadata(ctx context.Context, in *v2.ConnectorServiceGetMetadataRequest, opts ...grpc.CallOption) (*v2.ConnectorServiceGetMetadataResponse, error) {
Expand Down

0 comments on commit fe297ca

Please sign in to comment.