diff options
Diffstat (limited to 'roomserver/internal')
-rw-r--r-- | roomserver/internal/input.go | 4 | ||||
-rw-r--r-- | roomserver/internal/input_events.go | 53 | ||||
-rw-r--r-- | roomserver/internal/input_latest_events.go | 23 | ||||
-rw-r--r-- | roomserver/internal/input_membership.go | 24 | ||||
-rw-r--r-- | roomserver/internal/query.go | 4 | ||||
-rw-r--r-- | roomserver/internal/query_backfill.go | 2 |
6 files changed, 56 insertions, 54 deletions
diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go index ab3d7516..932b4df4 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input.go @@ -60,7 +60,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents( defer r.mutex.Unlock() for i := range request.InputInviteEvents { var loopback *api.InputRoomEvent - if loopback, err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + if loopback, err = r.processInviteEvent(ctx, r, request.InputInviteEvents[i]); err != nil { return err } // The processInviteEvent function can optionally return a @@ -71,7 +71,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents( } } for i := range request.InputRoomEvents { - if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { return err } } diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go index f5c678ca..a4167714 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input_events.go @@ -31,21 +31,13 @@ import ( log "github.com/sirupsen/logrus" ) -// OutputRoomEventWriter has the APIs needed to write an event to the output logs. -type OutputRoomEventWriter interface { - // Write a list of events for a room - WriteOutputEvents(roomID string, updates []api.OutputEvent) error -} - // processRoomEvent can only be called once at a time // // TODO(#375): This should be rewritten to allow concurrent calls. The // difficulty is in ensuring that we correctly annotate events with the correct // state deltas when sending to kafka streams -func processRoomEvent( +func (r *RoomserverInternalAPI) processRoomEvent( ctx context.Context, - db storage.Database, - ow OutputRoomEventWriter, input api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON @@ -54,7 +46,7 @@ func processRoomEvent( // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. - authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) + authEventNIDs, err := checkAuthEvents(ctx, r.DB, headered, input.AuthEventIDs) if err != nil { logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event") return @@ -63,7 +55,7 @@ func processRoomEvent( // If we don't have a transaction ID then get one. if input.TransactionID != nil { tdID := input.TransactionID - eventID, err = db.GetTransactionEventID( + eventID, err = r.DB.GetTransactionEventID( ctx, tdID.TransactionID, tdID.SessionID, event.Sender(), ) // On error OR event with the transaction already processed/processesing @@ -73,7 +65,7 @@ func processRoomEvent( } // Store the event. - roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) + roomNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return } @@ -93,16 +85,14 @@ func processRoomEvent( if stateAtEvent.BeforeStateSnapshotNID == 0 { // We haven't calculated a state for this event yet. // Lets calculate one. - err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) + err = r.calculateAndSetState(ctx, input, roomNID, &stateAtEvent, event) if err != nil { return } } - if err = updateLatestEvents( + if err = r.updateLatestEvents( ctx, // context - db, // roomserver database - ow, // output event writer roomNID, // room NID to update stateAtEvent, // state at event (below) event, // event @@ -116,29 +106,36 @@ func processRoomEvent( return event.EventID(), nil } -func calculateAndSetState( +func (r *RoomserverInternalAPI) calculateAndSetState( ctx context.Context, - db storage.Database, input api.InputRoomEvent, roomNID types.RoomNID, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, ) error { var err error - roomState := state.NewStateResolution(db) + roomState := state.NewStateResolution(r.DB) if input.HasState { - // TODO: Check here if we think we're in the room already. + // Check here if we think we're in the room already. stateAtEvent.Overwrite = true + var joinEventNIDs []types.EventNID + // Request join memberships only for local users only. + if joinEventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, true); err == nil { + // If we have no local users that are joined to the room then any state about + // the room that we have is quite possibly out of date. Therefore in that case + // we should overwrite it rather than merge it. + stateAtEvent.Overwrite = len(joinEventNIDs) == 0 + } // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. var entries []types.StateEntry - if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { + if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { return err } - if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil { + if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil { return err } } else { @@ -149,12 +146,11 @@ func calculateAndSetState( return err } } - return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) + return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) } -func processInviteEvent( +func (r *RoomserverInternalAPI) processInviteEvent( ctx context.Context, - db storage.Database, ow *RoomserverInternalAPI, input api.InputInviteEvent, ) (*api.InputRoomEvent, error) { @@ -172,7 +168,10 @@ func processInviteEvent( "target_user_id": targetUserID, }).Info("processing invite event") - updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion) + _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) + isTargetLocalUser := domain == r.Cfg.Matrix.ServerName + + updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion) if err != nil { return nil, err } @@ -239,7 +238,7 @@ func processInviteEvent( // up from local data (which is most likely to be if the event came // from the CS API). If we know about the room then we can insert // the invite room state, if we don't then we just fail quietly. - if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil { + if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil { if err = event.SetUnsignedField("invite_room_state", irs); err != nil { return nil, err } diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index 6eeeedab..d7c9a5cb 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" - "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -46,17 +45,15 @@ import ( // 7 <----- latest // // Can only be called once at a time -func updateLatestEvents( +func (r *RoomserverInternalAPI) updateLatestEvents( ctx context.Context, - db storage.Database, - ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, sendAsServer string, transactionID *api.TransactionID, ) (err error) { - updater, err := db.GetLatestEventsForUpdate(ctx, roomNID) + updater, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID) if err != nil { return } @@ -70,9 +67,8 @@ func updateLatestEvents( u := latestEventsUpdater{ ctx: ctx, - db: db, + api: r, updater: updater, - ow: ow, roomNID: roomNID, stateAtEvent: stateAtEvent, event: event, @@ -94,9 +90,8 @@ func updateLatestEvents( // when there are so many variables to pass around. type latestEventsUpdater struct { ctx context.Context - db storage.Database + api *RoomserverInternalAPI updater types.RoomRecentEventsUpdater - ow OutputRoomEventWriter roomNID types.RoomNID stateAtEvent types.StateAtEvent event gomatrixserverlib.Event @@ -181,7 +176,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // If we need to generate any output events then here's where we do it. // TODO: Move this! - updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added) + updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) if err != nil { return err } @@ -200,7 +195,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { return err } @@ -213,7 +208,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { func (u *latestEventsUpdater) latestState() error { var err error - roomState := state.NewStateResolution(u.db) + roomState := state.NewStateResolution(u.api.DB) // Get a list of the current latest events. latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) @@ -303,7 +298,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } - roomVersion, err := u.db.GetRoomVersionForRoom(u.ctx, u.event.RoomID()) + roomVersion, err := u.api.DB.GetRoomVersionForRoom(u.ctx, u.event.RoomID()) if err != nil { return nil, err } @@ -329,7 +324,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) stateEventNIDs = append(stateEventNIDs, entry.EventNID) } stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := u.db.EventIDs(u.ctx, stateEventNIDs) + eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs) if err != nil { return nil, err } diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go index 666e7ebc..af0c7f8b 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input_membership.go @@ -19,7 +19,6 @@ import ( "fmt" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -28,9 +27,8 @@ import ( // user affected by a change in the current state of the room. // Returns a list of output events to write to the kafka log to inform the // consumers about the invites added or retired by the change in current state. -func updateMemberships( +func (r *RoomserverInternalAPI) updateMemberships( ctx context.Context, - db storage.Database, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry, ) ([]api.OutputEvent, error) { @@ -48,7 +46,7 @@ func updateMemberships( // Load the event JSON so we can look up the "membership" key. // TODO: Maybe add a membership key to the events table so we can load that // key without having to load the entire event JSON? - events, err := db.Events(ctx, eventNIDs) + events, err := r.DB.Events(ctx, eventNIDs) if err != nil { return nil, err } @@ -71,15 +69,16 @@ func updateMemberships( ae = &ev.Event } } - if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { + if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil { return nil, err } } return updates, nil } -func updateMembership( - updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, +func (r *RoomserverInternalAPI) updateMembership( + updater types.RoomRecentEventsUpdater, + targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { @@ -113,7 +112,7 @@ func updateMembership( return updates, nil } - mu, err := updater.MembershipUpdater(targetUserNID) + mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) if err != nil { return nil, err } @@ -132,6 +131,15 @@ func updateMembership( } } +func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bool { + isTargetLocalUser := false + if statekey := event.StateKey(); statekey != nil { + _, domain, _ := gomatrixserverlib.SplitID('@', *statekey) + isTargetLocalUser = domain == r.Cfg.Matrix.ServerName + } + return isTargetLocalUser +} + func updateToInviteMembership( mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, roomVersion gomatrixserverlib.RoomVersion, diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 2d1c21c5..fce2ae90 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -267,7 +267,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom( var stateEntries []types.StateEntry if stillInRoom { var eventNIDs []types.EventNID - eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly) + eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly, false) if err != nil { return err } @@ -591,7 +591,7 @@ func (r *RoomserverInternalAPI) isServerCurrentlyInRoom(ctx context.Context, ser return false, err } - eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true) + eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) if err != nil { return false, err } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go index 49e0af34..23ae9455 100644 --- a/roomserver/internal/query_backfill.go +++ b/roomserver/internal/query_backfill.go @@ -297,7 +297,7 @@ func joinEventsFromHistoryVisibility( if err != nil { return nil, err } - joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true) + joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) if err != nil { return nil, err } |