Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate stuck invites #369

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions state/migrations/20231108122539_clear_stuck_invites.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package migrations

import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigrationContext(upClearStuckInvites, downClearStuckInvites)
}

// The purpose of this migration is to find users who have rooms which have
// not been properly processed by the proxy and invalidate their since token
// so they will do an initial sync on the next poller startup. This is specifically
// targeting stuck invites, where there is an invite in the invites table but
// the room is already joined. This is usually (always?) due to missing a create
// event when the room was joined, caused by a synapse bug outlined in
// https://github.com/matrix-org/sliding-sync/issues/367
// This isn't exclusively a problem with invites, though it manifests more clearly there.
func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error {
// The syncv3_unread table is updated any time A) a room is in rooms.join and B) the unread count has changed,
// where nil != 0. Therefore, we can use this table as a proxy for "have we seen a v2 response which has put this
// room into rooms.join"? For every room in rooms.join, we should have seen a create event for it, and hence have
// an entry in syncv3_rooms. If we do not have an entry in syncv3_rooms but do have an entry in syncv3_unread, this
// implies we failed to properly store this joined room and therefore the user who the unread marker is for should be
// reset to force an initial sync. On matrix.org, of the users using sliding sync, this will catch around ~1.82% of users
rows, err := tx.QueryContext(ctx, `
SELECT distinct(user_id) FROM syncv3_unread
WHERE room_id NOT IN (
SELECT room_id
FROM syncv3_rooms
)
`)
defer rows.Close()
if err != nil {
return fmt.Errorf("failed to select bad users: %w", err)
}

var usersToInvalidate []string
for rows.Next() {
var userID string
err = rows.Scan(&userID)
if err != nil {
return fmt.Errorf("failed to scan user: %w", err)
}
usersToInvalidate = append(usersToInvalidate, userID)
}
logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users")
if len(usersToInvalidate) < 50 {
logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users")
}

// for each user:
// - reset their since token for all devices
// - remove any outstanding invites (we'll be told about them again when they initial sync)
res, err := tx.ExecContext(ctx, `
UPDATE syncv3_sync2_devices SET since='' WHERE user_id=ANY($1)
`, pq.StringArray(usersToInvalidate))
if err != nil {
return fmt.Errorf("failed to invalidate since tokens: %w", err)
}
ra, _ := res.RowsAffected()
logger.Info().Int64("num_devices", ra).Msg("reset since tokens")

res, err = tx.ExecContext(ctx, `
DELETE FROM syncv3_invites WHERE user_id=ANY($1)
`, pq.StringArray(usersToInvalidate))
if err != nil {
return fmt.Errorf("failed to remove outstanding invites: %w", err)
}
ra, _ = res.RowsAffected()
logger.Info().Int64("num_invites", ra).Msg("reset invites")
return nil
}

func downClearStuckInvites(ctx context.Context, tx *sql.Tx) error {
// we can't roll this back
return nil
}
232 changes: 232 additions & 0 deletions state/migrations/20231108122539_clear_stuck_invites_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package migrations

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync2"
)

func TestClearStuckInvites(t *testing.T) {
db, close := connectToDB(t)
defer close()
roomsTable := state.NewRoomsTable(db)
inviteTable := state.NewInvitesTable(db)
unreadTable := state.NewUnreadTable(db)
deviceTable := sync2.NewDevicesTable(db)
tokensTable := sync2.NewTokensTable(db, "secret")

zero := 0
device1 := "TEST_1"
device2 := "TEST_2"
roomA := "!TestClearStuckInvites_a:localhost"
roomB := "!TestClearStuckInvites_b:localhost"
roomC := "!TestClearStuckInvites_c:localhost"
roomD := "!TestClearStuckInvites_d:localhost"
roomE := "!TestClearStuckInvites_e:localhost"
roomF := "!TestClearStuckInvites_f:localhost"
roomG := "!TestClearStuckInvites_g:localhost"
alice := "@TestClearStuckInvites_alice:localhost"
bob := "@TestClearStuckInvites_bob:localhost"
charlie := "@TestClearStuckInvites_charlie:localhost"
doris := "@TestClearStuckInvites_doris:localhost"
users := []string{
alice, bob, charlie, doris,
}

// Test cases:
// Room | In Invite Table? | In Unread Table? | In Room Table? | Comment
// A Y Y Y OK, Genuine invite, proxy in room
// B Y Y N BAD, Stuck invite, proxy never saw join
// C Y N Y OK, Genuine invite, proxy in room, no unread counts (unusual but valid)
// D Y N N OK, Genuine invite, proxy not in room
// E N Y Y OK, Genuine joined room
// F N Y N BAD, Stuck joined room, proxy never saw join
// G N N Y OK, Genuine joined room, no unread counts (unusual but valid)
// - N N N Impossible, room id isn't in any table!
kegsay marked this conversation as resolved.
Show resolved Hide resolved
roomToInfo := map[string]struct {
invitedUser string
unreadUser string
inRoomTable bool
}{
roomA: {
invitedUser: alice,
unreadUser: bob,
inRoomTable: true,
},
roomB: {
invitedUser: bob,
unreadUser: bob,
inRoomTable: false,
},
roomC: {
invitedUser: charlie,
inRoomTable: true,
},
roomD: {
invitedUser: doris,
inRoomTable: false,
},
roomE: {
unreadUser: alice,
inRoomTable: true,
},
roomF: {
unreadUser: doris,
},
roomG: {
inRoomTable: true,
},
}

err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
for roomID, info := range roomToInfo {
if info.inRoomTable {
err := roomsTable.Upsert(txn, state.RoomInfo{
ID: roomID,
}, 0, 0)
if err != nil {
return fmt.Errorf("Upsert room: %s", err)
}
}
if info.invitedUser != "" {
err := inviteTable.InsertInvite(info.invitedUser, roomID, []json.RawMessage{json.RawMessage(`{}`)})
if err != nil {
return fmt.Errorf("InsertInvite: %s", err)
}
}
if info.unreadUser != "" {
err := unreadTable.UpdateUnreadCounters(info.unreadUser, roomID, &zero, &zero)
if err != nil {
return fmt.Errorf("UpdateUnreadCounters: %s", err)
}
}
}
for _, userID := range users {
for _, deviceID := range []string{device1, device2} {
// each user has 2 devices
if err := deviceTable.InsertDevice(txn, userID, deviceID); err != nil {
return fmt.Errorf("InsertDevice: %s", err)
}
_, err := tokensTable.Insert(txn, userID+deviceID, userID, deviceID, time.Now())
if err != nil {
return fmt.Errorf("TokensTable.Insert: %s", err)
}
}
}
return nil
})
if err != nil {
t.Fatalf("failed to set up test configuration: %s", err)
}
// set since tokens (this is done without a txn hence cannot be bundled in as the UPDATE would fail)
for _, userID := range users {
for i, deviceID := range []string{device1, device2} {
// each user has 2 devices
since := fmt.Sprintf("since_%d", i)
if err := deviceTable.UpdateDeviceSince(userID, deviceID, since); err != nil {
t.Fatalf("UpdateDeviceSince: %s", err)
}
}
}

t.Log("Run the migration.")
tx, err := db.Beginx()
if err != nil {
t.Fatal(err)
}
if err := upClearStuckInvites(context.Background(), tx.Tx); err != nil {
t.Fatalf("upClearStuckInvites: %s", err)
}
tx.Commit()

// make a new txn for assertions
tx, err = db.Beginx()
if err != nil {
t.Fatal(err)
}

// users in room B (bob) and F (doris) should be reset.
tokens, err := tokensTable.TokenForEachDevice(tx)
if err != nil {
t.Fatalf("TokenForEachDevice: %s", err)
}
wantResults := map[[2]string]struct {
wantSinceReset bool
}{
{bob, device1}: {
wantSinceReset: true,
},
{bob, device2}: {
wantSinceReset: true,
},
{doris, device1}: {
wantSinceReset: true,
},
{doris, device2}: {
wantSinceReset: true,
},
// everyone else should NOT have since reset
{alice, device1}: {
wantSinceReset: false,
},
{alice, device2}: {
wantSinceReset: false,
},
{charlie, device1}: {
wantSinceReset: false,
},
{charlie, device2}: {
wantSinceReset: false,
},
}
for _, tok := range tokens {
key := [2]string{tok.UserID, tok.DeviceID}
want, ok := wantResults[key]
if !ok {
continue // different user in another test?
}
if want.wantSinceReset && tok.Since != "" {
t.Errorf("%s want since reset, got %+v", key, tok)
}
if !want.wantSinceReset && tok.Since == "" {
t.Errorf("%s did not want since reset, got %+v", key, tok)
}
}
// invites for Bob and Doris are gone
for _, userID := range []string{bob, doris} {
got, err := inviteTable.SelectAllInvitesForUser(userID)
if err != nil {
t.Fatalf("SelectAllInvitesForUser: %s", err)
}
if len(got) > 0 {
t.Fatalf("SelectAllInvitesForUser got invites for %s, wanted none: %+v", userID, got)
}
}
kegsay marked this conversation as resolved.
Show resolved Hide resolved
// ensure other invites remain
wantInvites := map[string][]string{
alice: {roomA},
charlie: {roomC},
}
for userID, wantInvitesRooms := range wantInvites {
got, err := inviteTable.SelectAllInvitesForUser(userID)
if err != nil {
t.Fatalf("SelectAllInvitesForUser: %s", err)
}
if len(got) != len(wantInvitesRooms) {
t.Fatalf("SelectAllInvitesForUser got %d invites for %s, wanted %d", len(got), userID, len(wantInvitesRooms))
}
for _, wantRoom := range wantInvitesRooms {
_, exists := got[wantRoom]
if !exists {
t.Fatalf("SelectAllInvitesForUser wanted invite for %s in %s, but it's missing", userID, wantRoom)
}
}
}
}
Loading