Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle gappy state with a single snapshot, attempt 3 #363

Merged
merged 12 commits into from
Nov 10, 2023
272 changes: 176 additions & 96 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,69 +141,62 @@ type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
// It has no meaning if AddedEvents is false.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
// ReplacedExistingSnapshot is true when we created a new snapshot for the room and
// there a pre-existing room snapshot. It has no meaning if AddedEvents is false.
ReplacedExistingSnapshot bool
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// Initialise processes the state block of a V2 sync response for a particular room. If
// the state of the room has changed, we persist any new state events and create a new
// "snapshot" of its entire state.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
// Summary of the logic:
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
// 0. Ensure the state block is not empty.
//
// and otherwise does nothing.
// 1. Capture the current snapshot ID, possibly zero. If it is zero, ensure that the
// state block contains a `create event`.
//
// 2. Insert the events. If there are no newly inserted events, bail. If there are new
// events, then the state block has definitely changed. Note: we ignore cases where
// the state has only changed to a known subset of state events (i.e in the case of
// state resets, slow pollers) as it is impossible to then reconcile that state with
// any new events, as any "catchup" state will be ignored due to the events already
// existing.
//
// 3. Fetch the current state of the room, as a map from (type, state_key) to event.
// If there is no existing state snapshot, this map is the empty map.
// If the state hasn't altered, bail.
//
// 4. Create new snapshot. Update the map from (3) with the events in `state`.
// (There is similar logic for this in Accumulate.)
// Store the snapshot. Mark the room's current state as being this snapshot.
//
// 5. Any other processing of the new state events.
//
// 6. Return an "AddedEvents" bool (if true, emit an Initialise payload) and a
// "ReplacedSnapshot" bool (if true, emit a cache invalidation payload).

func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
var startingSnapshotID int64

// 0. Ensure the state block is not empty.
if len(state) == 0 {
return res, nil
}
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) (err error) {
// 1. Capture the current snapshot ID, checking for a create event if this is our first snapshot.

// Attempt to short-circuit. This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
startingSnapshotID, err = a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
if err != nil {
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
return fmt.Errorf("error fetching snapshot id for room %s: %w", roomID, err)
}

// We don't have a snapshot for this room. Parse the events first.
// Start by parsing the events in the state block.
events := make([]Event, len(state))
for i := range events {
events[i] = Event{
Expand All @@ -214,77 +207,77 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
}
events = filterAndEnsureFieldsSet(events)
if len(events) == 0 {
return fmt.Errorf("failed to insert events, all events were filtered out: %w", err)
return fmt.Errorf("failed to parse state block, all events were filtered out: %w", err)
}

// Before proceeding further, ensure that we have "proper" state and not just a
// single stray event by looking for the create event.
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
if startingSnapshotID == 0 {
// Ensure that we have "proper" state and not "stray" events from Synapse.
if err = ensureStateHasCreateEvent(events); err != nil {
return err
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", roomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}

// Insert the events.
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)
// 2. Insert the events and determine which ones are new.
newEventIDToNID, err := a.eventsTable.Insert(txn, events, false)
if err != nil {
return fmt.Errorf("failed to insert events: %w", err)
}
if len(eventIDToNID) == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
if len(newEventIDToNID) == 0 {
if startingSnapshotID == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
}
// Note: we otherwise ignore cases where the state has only changed to a
// known subset of state events (i.e in the case of state resets, slow
// pollers) as it is impossible to then reconcile that state with
// any new events, as any "catchup" state will be ignored due to the events
// already existing.
return nil
}

// pull out the event NIDs we just inserted
membershipEventIDs := make(map[string]struct{}, len(events))
newEvents := make([]Event, 0, len(newEventIDToNID))
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
newNid, isNew := newEventIDToNID[event.ID]
if isNew {
event.NID = newNid
newEvents = append(newEvents, event)
}
}
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, int64(nid))

// 3. Fetch the current state of the room.
var currentState stateMap
if startingSnapshotID > 0 {
currentState, err = a.stateMapAtSnapshot(txn, startingSnapshotID)
if err != nil {
return fmt.Errorf("failed to load state map: %w", err)
}
} else {
currentState = stateMap{
// Typically expect Other to be small, but Memberships may be large (think: Matrix HQ.)
Memberships: make(map[string]int64, len(events)),
Other: make(map[[2]string]int64),
}
}

// Make a current snapshot
// 4. Update the map from (3) with the new events to create a new snapshot.
for _, ev := range newEvents {
currentState.Ingest(ev)
}
memberNIDs, otherNIDs := currentState.NIDs()
snapshot := &SnapshotRow{
RoomID: roomID,
MembershipEvents: pq.Int64Array(memberNIDs),
OtherEvents: pq.Int64Array(otherNIDs),
MembershipEvents: memberNIDs,
OtherEvents: otherNIDs,
}
err = a.snapshotTable.Insert(txn, snapshot)
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
res.AddedEvents = true

// 5. Any other processing of new state events.
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand Down Expand Up @@ -313,8 +306,16 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
err = a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
if err != nil {
return err
}

// 6. Tell the caller what happened, so they know what payloads to emit.
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
res.AddedEvents = true
res.ReplacedExistingSnapshot = startingSnapshotID > 0
return nil
})
return res, err
}
Expand Down Expand Up @@ -652,3 +653,82 @@ func (a *Accumulator) filterToNewTimelineEvents(txn *sqlx.Tx, dedupedEvents []Ev
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
return dedupedEvents[seenIndex+1:], nil
}

func ensureStateHasCreateEvent(events []Event) error {
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": events[0].RoomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", events[0].RoomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}
return nil
}

type stateMap struct {
// state_key (user id) -> NID
Memberships map[string]int64
// type, state_key -> NID
Other map[[2]string]int64
}

func (s *stateMap) Ingest(e Event) (replacedNID int64) {
if e.Type == "m.room.member" {
replacedNID = s.Memberships[e.StateKey]
s.Memberships[e.StateKey] = e.NID
} else {
key := [2]string{e.Type, e.StateKey}
replacedNID = s.Other[key]
s.Other[key] = e.NID
}
return
}

func (s *stateMap) NIDs() (membershipNIDs, otherNIDs []int64) {
membershipNIDs = make([]int64, 0, len(s.Memberships))
otherNIDs = make([]int64, 0, len(s.Other))
for _, nid := range s.Memberships {
membershipNIDs = append(membershipNIDs, nid)
}
for _, nid := range s.Other {
otherNIDs = append(otherNIDs, nid)
}
return
}

func (a *Accumulator) stateMapAtSnapshot(txn *sqlx.Tx, snapID int64) (stateMap, error) {
snapshot, err := a.snapshotTable.Select(txn, snapID)
if err != nil {
return stateMap{}, err
}
// pull stripped events as this may be huge (think Matrix HQ)
events, err := a.eventsTable.SelectStrippedEventsByNIDs(txn, true, append(snapshot.MembershipEvents, snapshot.OtherEvents...))
if err != nil {
return stateMap{}, err
}

state := stateMap{
Memberships: make(map[string]int64, len(snapshot.MembershipEvents)),
Other: make(map[[2]string]int64, len(snapshot.OtherEvents)),
}
for _, e := range events {
state.Ingest(e)
}
return state, nil
}
Loading
Loading