diff options
author | Kegsay <kegan@matrix.org> | 2020-04-28 11:46:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-28 11:46:47 +0100 |
commit | 6d832ae544a6221eb01dc7bad170d3b25a534a1e (patch) | |
tree | ad0eda5ec31154e83188fec4cbc49252cb85b505 /roomserver/query/query.go | |
parent | 3a858afca2368f588b2681de4f4816f26686f540 (diff) |
Implement backfill in the roomserver (#983)
* Initial cut for backfilling
The syncserver now asks the roomserver via QueryBackfill (which already
existed to *handle* backfill requests) which then makes federation requests
via gomatrixserverlib.RequestBackfill.
Currently, tests fail on subsequent /messages requests because we don't know
which servers are in the room, because we are unable to get state snapshots
from a backfilled event because that code doesn't exist yet.
* WIP backfill, doesn't work
* Make initial backfill pass checks
* Persist backfilled events with state snapshots
* Remove debug lines
* Linting
* Review comments
Diffstat (limited to 'roomserver/query/query.go')
-rw-r--r-- | roomserver/query/query.go | 147 |
1 files changed, 94 insertions, 53 deletions
diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 7508d790..a54fa58d 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -19,6 +19,7 @@ package query import ( "context" "encoding/json" + "fmt" "net/http" "github.com/matrix-org/dendrite/common" @@ -31,12 +32,16 @@ import ( "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI type RoomserverQueryAPI struct { DB storage.Database ImmutableCache caching.ImmutableCache + ServerName gomatrixserverlib.ServerName + KeyRing gomatrixserverlib.JSONVerifier + FedClient *gomatrixserverlib.FederationClient } // QueryLatestEventsAndState implements api.RoomserverQueryAPI @@ -281,7 +286,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom( events, err = r.DB.Events(ctx, eventNIDs) } else { - events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly) + events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly) } if err != nil { @@ -300,19 +305,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom( // of the event's room as it was when this event was fired, then filters the state events to // only keep the "m.room.member" events with a "join" membership. These events are returned. // Returns an error if there was an issue fetching the events. -func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( - ctx context.Context, eventNID types.EventNID, joinedOnly bool, +func getMembershipsBeforeEventNID( + ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool, ) ([]types.Event, error) { - roomState := state.NewStateResolution(r.DB) + roomState := state.NewStateResolution(db) events := []types.Event{} // Lookup the event NID - eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID}) + eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID}) if err != nil { return nil, err } eventIDs := []string{eIDs[eventNID]} - prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs) + prevState, err := db.StateAtEventIDs(ctx, eventIDs) if err != nil { return nil, err } @@ -332,7 +337,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( } // Get all of the events in this state - stateEvents, err := r.DB.Events(ctx, eventNIDs) + stateEvents, err := db.Events(ctx, eventNIDs) if err != nil { return nil, err } @@ -484,6 +489,13 @@ func (r *RoomserverQueryAPI) QueryBackfill( request *api.QueryBackfillRequest, response *api.QueryBackfillResponse, ) error { + // if we are requesting the backfill then we need to do a federation hit + // TODO: we could be more sensible and fetch as many events we already have then request the rest + // which is what the syncapi does already. + if request.ServerName == r.ServerName { + return r.backfillViaFederation(ctx, request, response) + } + // someone else is requesting the backfill, try to service their request. var err error var front []string @@ -525,6 +537,55 @@ func (r *RoomserverQueryAPI) QueryBackfill( return err } +func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error { + roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID) + if err != nil { + return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) + } + requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName) + events, err := gomatrixserverlib.RequestBackfill( + ctx, requester, + r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit) + if err != nil { + return err + } + logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events)) + + // persist these new events - auth checks have already been done + roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events) + if err != nil { + return err + } + + for _, ev := range backfilledEventMap { + // now add state for these events + stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()] + if !ok { + // this should be impossible as all events returned must have pass Step 5 of the PDU checks + // which requires a list of state IDs. + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks") + continue + } + var entries []types.StateEntry + if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { + return err + } + + var beforeStateSnapshotNID types.StateSnapshotNID + if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil { + return err + } + if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event") + } + } + + // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. + + res.Events = events + return nil +} + func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { roomNID, err := r.DB.RoomNID(ctx, roomID) if err != nil { @@ -778,39 +839,33 @@ func getAuthChain( return authEvents, nil } -// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI -func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent( - ctx context.Context, - request *api.QueryServersInRoomAtEventRequest, - response *api.QueryServersInRoomAtEventResponse, -) error { - // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for - // the event is necessary. - NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID}) - if err != nil { - return err - } - - // Retrieve all "m.room.member" state events of "join" membership, which - // contains the list of users in the room before the event, therefore all - // the servers in it at that moment. - events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true) - if err != nil { - return err - } - - // Store the server names in a temporary map to avoid duplicates. - servers := make(map[gomatrixserverlib.ServerName]bool) - for _, event := range events { - servers[event.Origin()] = true - } - - // Populate the response. - for server := range servers { - response.Servers = append(response.Servers, server) +func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) { + var roomNID types.RoomNID + backfilledEventMap := make(map[string]types.Event) + for _, ev := range events { + nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs()) + if err != nil { // this shouldn't happen as RequestBackfill already found them + logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events") + continue + } + authNids := make([]types.EventNID, len(nidMap)) + i := 0 + for _, nid := range nidMap { + authNids[i] = nid + i++ + } + var stateAtEvent types.StateAtEvent + roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event") + continue + } + backfilledEventMap[ev.EventID()] = types.Event{ + EventNID: stateAtEvent.StateEntry.EventNID, + Event: ev.Unwrap(), + } } - - return nil + return roomNID, backfilledEventMap } // QueryRoomVersionCapabilities implements api.RoomserverQueryAPI @@ -995,20 +1050,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { }), ) servMux.Handle( - api.RoomserverQueryServersInRoomAtEventPath, - common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse { - var request api.QueryServersInRoomAtEventRequest - var response api.QueryServersInRoomAtEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.ErrorResponse(err) - } - if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) - servMux.Handle( api.RoomserverQueryRoomVersionCapabilitiesPath, common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse { var request api.QueryRoomVersionCapabilitiesRequest |