Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache invalidation tests (rejoining rooms) #377

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error {
FROM syncv3_events JOIN snapshot ON (
event_nid = ANY (ARRAY_CAT(events, membership_events))
)
WHERE (event_type IN ('m.room.name', 'm.room.avatar', 'm.room.canonical_alias') AND state_key = '')
WHERE (event_type IN ('m.room.name', 'm.room.avatar', 'm.room.canonical_alias', 'm.room.encryption') AND state_key = '')
OR (event_type = 'm.room.member' AND membership IN ('join', '_join', 'invite', '_invite'))
ORDER BY event_nid ASC
;`, metadata.RoomID)
Expand All @@ -334,9 +334,11 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error {
case "m.room.name":
metadata.NameEvent = gjson.GetBytes(ev.JSON, "content.name").Str
case "m.room.avatar":
metadata.AvatarEvent = gjson.GetBytes(ev.JSON, "content.avatar_url").Str
metadata.AvatarEvent = gjson.GetBytes(ev.JSON, "content.url").Str
case "m.room.canonical_alias":
metadata.CanonicalAlias = gjson.GetBytes(ev.JSON, "content.alias").Str
case "m.room.encryption":
metadata.Encrypted = true
case "m.room.member":
heroMemberships.append(&events[i])
switch ev.Membership {
Expand Down Expand Up @@ -365,7 +367,7 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error {
metadata.Heroes = append(metadata.Heroes, hero)
}

// For now, don't bother reloading Encrypted, PredecessorID and UpgradedRoomID.
// For now, don't bother reloading PredecessorID and UpgradedRoomID.
// These shouldn't be changing during a room's lifetime in normal operation.
kegsay marked this conversation as resolved.
Show resolved Hide resolved

// We haven't updated LatestEventsByType because that's not part of the timeline.
Expand Down Expand Up @@ -751,6 +753,16 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64,
return result, err
}

func (s *Storage) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) {
var err error
sqlutil.WithTransaction(s.DB, func(txn *sqlx.Tx) error {
// discard the error, we don't care if we fail as it's best effort
prevBatch, err = s.EventsTable.SelectClosestPrevBatch(txn, roomID, eventNID)
return err
})
return
}

// visibleEventNIDsBetweenForRooms determines which events a given user has permission to see.
// It accepts a nid range [from, to]. For each given room, it calculates the NID range
// [A1, B1] within [from, to] in which the user has permission to see events.
Expand Down
15 changes: 13 additions & 2 deletions sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ type UserCacheListener interface {
OnUpdate(ctx context.Context, up Update)
}

// Subset of store functions used by the user cache
type UserCacheStore interface {
LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*state.LatestEvents, error)
GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string)
}

// Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData.
// This data is user-scoped, not global or connection scoped.
type UserCache struct {
Expand All @@ -185,14 +191,14 @@ type UserCache struct {
listeners map[int]UserCacheListener
listenersMu *sync.RWMutex
id int
store *state.Storage
store UserCacheStore
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
}

func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache {
func NewUserCache(userID string, globalCache *GlobalCache, store UserCacheStore, txnIDs TransactionIDFetcher) *UserCache {
// see SyncLiveHandler.userCache for the initialisation proper, which works by
// firing off a bunch of OnBlahBlah callbacks.
uc := &UserCache{
Expand Down Expand Up @@ -415,6 +421,11 @@ func (c *UserCache) Invites() map[string]UserRoomData {
return invites
}

// AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed.
func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) {
return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID)
}

// AnnotateWithTransactionIDs should be called just prior to returning events to the client. This
// will modify the events to insert the correct transaction IDs if needed. This is required because
// events are globally scoped, so if Alice sends a message, Bob might receive it first on his v2 loop
Expand Down
8 changes: 7 additions & 1 deletion sync3/handler/connstate_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, map[string][]json.RawMessage{
roomEventUpdate.RoomID(): {roomEventUpdate.EventData.Event},
})
if len(r.Timeline) == 0 && r.PrevBatch == "" {
// attempt to fill in the prev_batch value for this room
prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData)
if prevBatch != "" {
r.PrevBatch = prevBatch
}
}
r.Timeline = append(r.Timeline, roomIDtoTimeline[roomEventUpdate.RoomID()]...)
roomID := roomEventUpdate.RoomID()
sender := roomEventUpdate.EventData.Sender
Expand Down Expand Up @@ -278,7 +285,6 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
if delta.JoinCountChanged {
thisRoom.JoinedCount = roomUpdate.GlobalRoomMetadata().JoinCount
}

response.Rooms[roomUpdate.RoomID()] = thisRoom
}
if delta.HighlightCountChanged || delta.NotificationCountChanged {
Expand Down
20 changes: 15 additions & 5 deletions sync3/handler/connstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/state"
"reflect"
"testing"
"time"

"github.com/matrix-org/sliding-sync/state"

"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3"
Expand All @@ -26,6 +27,15 @@ func (h *NopExtensionHandler) Handle(ctx context.Context, req extensions.Request
func (h *NopExtensionHandler) HandleLiveUpdate(ctx context.Context, update caches.Update, req extensions.Request, res *extensions.Response, extCtx extensions.Context) {
}

type NopUserCacheStore struct{}

func (s *NopUserCacheStore) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) {
return
}
func (s *NopUserCacheStore) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*state.LatestEvents, error) {
return nil, nil
}

type NopJoinTracker struct{}

func (t *NopJoinTracker) IsUserJoined(userID, roomID string) bool {
Expand Down Expand Up @@ -96,7 +106,7 @@ func TestConnStateInitial(t *testing.T) {
roomC.RoomID: {NID: 780, Timestamp: 789},
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{})
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
Expand Down Expand Up @@ -269,7 +279,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
}
return 1, roomMetadata, joinTimings, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{})
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
Expand Down Expand Up @@ -448,7 +458,7 @@ func TestBumpToOutsideRange(t *testing.T) {
}, nil, nil

}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{})
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
Expand Down Expand Up @@ -551,7 +561,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomD.RoomID: {NID: 4, Timestamp: 4},
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{})
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
Expand Down
6 changes: 5 additions & 1 deletion tests-e2e/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (c *CSAPI) Scrollback(t *testing.T, roomID, prevBatch string, limit int) gj
func (c *CSAPI) SlidingSync(t *testing.T, data sync3.Request, opts ...client.RequestOpt) (resBody *sync3.Response) {
t.Helper()
res := c.DoSlidingSync(t, data, opts...)
if res.StatusCode != 200 {
t.Fatalf("SlidingSync returned %v", res.Status)
}
body := client.ParseJSON(t, res)
if err := json.Unmarshal(body, &resBody); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
Expand Down Expand Up @@ -194,7 +197,8 @@ func (c *CSAPI) SlidingSyncUntilEvent(t *testing.T, pos string, data sync3.Reque
return nil
}
}
return fmt.Errorf("found room %s but missing event", roomID)
b, _ := json.Marshal(room.Timeline)
return fmt.Errorf("found room %s but missing event, timeline=%v", roomID, string(b))
})
}

Expand Down
Loading
Loading