aboutsummaryrefslogtreecommitdiff
path: root/roomserver/query/query.go
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-04-28 11:46:47 +0100
committerGitHub <noreply@github.com>2020-04-28 11:46:47 +0100
commit6d832ae544a6221eb01dc7bad170d3b25a534a1e (patch)
treead0eda5ec31154e83188fec4cbc49252cb85b505 /roomserver/query/query.go
parent3a858afca2368f588b2681de4f4816f26686f540 (diff)
Implement backfill in the roomserver (#983)
* Initial cut for backfilling The syncserver now asks the roomserver via QueryBackfill (which already existed to *handle* backfill requests) which then makes federation requests via gomatrixserverlib.RequestBackfill. Currently, tests fail on subsequent /messages requests because we don't know which servers are in the room, because we are unable to get state snapshots from a backfilled event because that code doesn't exist yet. * WIP backfill, doesn't work * Make initial backfill pass checks * Persist backfilled events with state snapshots * Remove debug lines * Linting * Review comments
Diffstat (limited to 'roomserver/query/query.go')
-rw-r--r--roomserver/query/query.go147
1 files changed, 94 insertions, 53 deletions
diff --git a/roomserver/query/query.go b/roomserver/query/query.go
index 7508d790..a54fa58d 100644
--- a/roomserver/query/query.go
+++ b/roomserver/query/query.go
@@ -19,6 +19,7 @@ package query
import (
"context"
"encoding/json"
+ "fmt"
"net/http"
"github.com/matrix-org/dendrite/common"
@@ -31,12 +32,16 @@ import (
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
)
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
type RoomserverQueryAPI struct {
DB storage.Database
ImmutableCache caching.ImmutableCache
+ ServerName gomatrixserverlib.ServerName
+ KeyRing gomatrixserverlib.JSONVerifier
+ FedClient *gomatrixserverlib.FederationClient
}
// QueryLatestEventsAndState implements api.RoomserverQueryAPI
@@ -281,7 +286,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
events, err = r.DB.Events(ctx, eventNIDs)
} else {
- events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly)
+ events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly)
}
if err != nil {
@@ -300,19 +305,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
// of the event's room as it was when this event was fired, then filters the state events to
// only keep the "m.room.member" events with a "join" membership. These events are returned.
// Returns an error if there was an issue fetching the events.
-func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
- ctx context.Context, eventNID types.EventNID, joinedOnly bool,
+func getMembershipsBeforeEventNID(
+ ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool,
) ([]types.Event, error) {
- roomState := state.NewStateResolution(r.DB)
+ roomState := state.NewStateResolution(db)
events := []types.Event{}
// Lookup the event NID
- eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID})
+ eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
if err != nil {
return nil, err
}
eventIDs := []string{eIDs[eventNID]}
- prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs)
+ prevState, err := db.StateAtEventIDs(ctx, eventIDs)
if err != nil {
return nil, err
}
@@ -332,7 +337,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
}
// Get all of the events in this state
- stateEvents, err := r.DB.Events(ctx, eventNIDs)
+ stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil {
return nil, err
}
@@ -484,6 +489,13 @@ func (r *RoomserverQueryAPI) QueryBackfill(
request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse,
) error {
+ // if we are requesting the backfill then we need to do a federation hit
+ // TODO: we could be more sensible and fetch as many events we already have then request the rest
+ // which is what the syncapi does already.
+ if request.ServerName == r.ServerName {
+ return r.backfillViaFederation(ctx, request, response)
+ }
+ // someone else is requesting the backfill, try to service their request.
var err error
var front []string
@@ -525,6 +537,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err
}
+func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
+ roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
+ if err != nil {
+ return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
+ }
+ requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
+ events, err := gomatrixserverlib.RequestBackfill(
+ ctx, requester,
+ r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
+ if err != nil {
+ return err
+ }
+ logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
+
+ // persist these new events - auth checks have already been done
+ roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
+ if err != nil {
+ return err
+ }
+
+ for _, ev := range backfilledEventMap {
+ // now add state for these events
+ stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
+ if !ok {
+ // this should be impossible as all events returned must have pass Step 5 of the PDU checks
+ // which requires a list of state IDs.
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks")
+ continue
+ }
+ var entries []types.StateEntry
+ if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
+ return err
+ }
+
+ var beforeStateSnapshotNID types.StateSnapshotNID
+ if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
+ return err
+ }
+ if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event")
+ }
+ }
+
+ // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
+
+ res.Events = events
+ return nil
+}
+
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
roomNID, err := r.DB.RoomNID(ctx, roomID)
if err != nil {
@@ -778,39 +839,33 @@ func getAuthChain(
return authEvents, nil
}
-// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI
-func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent(
- ctx context.Context,
- request *api.QueryServersInRoomAtEventRequest,
- response *api.QueryServersInRoomAtEventResponse,
-) error {
- // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
- // the event is necessary.
- NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID})
- if err != nil {
- return err
- }
-
- // Retrieve all "m.room.member" state events of "join" membership, which
- // contains the list of users in the room before the event, therefore all
- // the servers in it at that moment.
- events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
- if err != nil {
- return err
- }
-
- // Store the server names in a temporary map to avoid duplicates.
- servers := make(map[gomatrixserverlib.ServerName]bool)
- for _, event := range events {
- servers[event.Origin()] = true
- }
-
- // Populate the response.
- for server := range servers {
- response.Servers = append(response.Servers, server)
+func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
+ var roomNID types.RoomNID
+ backfilledEventMap := make(map[string]types.Event)
+ for _, ev := range events {
+ nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
+ if err != nil { // this shouldn't happen as RequestBackfill already found them
+ logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
+ continue
+ }
+ authNids := make([]types.EventNID, len(nidMap))
+ i := 0
+ for _, nid := range nidMap {
+ authNids[i] = nid
+ i++
+ }
+ var stateAtEvent types.StateAtEvent
+ roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
+ if err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
+ continue
+ }
+ backfilledEventMap[ev.EventID()] = types.Event{
+ EventNID: stateAtEvent.StateEntry.EventNID,
+ Event: ev.Unwrap(),
+ }
}
-
- return nil
+ return roomNID, backfilledEventMap
}
// QueryRoomVersionCapabilities implements api.RoomserverQueryAPI
@@ -995,20 +1050,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
}),
)
servMux.Handle(
- api.RoomserverQueryServersInRoomAtEventPath,
- common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
- var request api.QueryServersInRoomAtEventRequest
- var response api.QueryServersInRoomAtEventResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.ErrorResponse(err)
- }
- if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
- servMux.Handle(
api.RoomserverQueryRoomVersionCapabilitiesPath,
common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {
var request api.QueryRoomVersionCapabilitiesRequest