From ec7718e7f842fa0fc5198489c904de21003db4c2 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 11 Jun 2020 19:50:40 +0100 Subject: Roomserver API changes (#1118) * s/QueryBackfill/PerformBackfill/g * OutputEvent now includes AddStateEvents which contain the full event of extra state events * Only include adds not the current event * Get adding state right --- roomserver/api/api.go | 6 +- roomserver/api/output.go | 27 +++ roomserver/api/perform.go | 29 +++ roomserver/api/query.go | 29 --- roomserver/internal/input_latest_events.go | 66 +++++-- roomserver/internal/perform_backfill.go | 305 +++++++++++++++++++++++++++++ roomserver/internal/query.go | 12 +- roomserver/internal/query_backfill.go | 305 ----------------------------- roomserver/inthttp/client.go | 18 +- roomserver/inthttp/server.go | 10 +- 10 files changed, 435 insertions(+), 372 deletions(-) create mode 100644 roomserver/internal/perform_backfill.go delete mode 100644 roomserver/internal/query_backfill.go (limited to 'roomserver') diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 3a2ad059..967f58ba 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -89,10 +89,10 @@ type RoomserverInternalAPI interface { ) error // Query a given amount (or less) of events prior to a given set of events. - QueryBackfill( + PerformBackfill( ctx context.Context, - request *QueryBackfillRequest, - response *QueryBackfillResponse, + request *PerformBackfillRequest, + response *PerformBackfillResponse, ) error // Asks for the default room version as preferred by the server. diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 92a468a9..2bbd97af 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -63,6 +63,13 @@ type OutputNewRoomEvent struct { // Together with RemovesStateEventIDs this allows the receiver to keep an up to date // view of the current state of the room. AddsStateEventIDs []string `json:"adds_state_event_ids"` + // All extra newly added state events. This is only set if there are *extra* events + // other than `Event`. This can happen when forks get merged because state resolution + // may decide a bunch of state events on one branch are now valid, so they will be + // present in this list. This is useful when trying to maintain the current state of a room + // as to do so you need to include both these events and `Event`. + AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"` + // The state event IDs that were removed from the state of the room by this event. RemovesStateEventIDs []string `json:"removes_state_event_ids"` // The ID of the event that was output before this event. @@ -112,6 +119,26 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id"` } +// AddsState returns all added state events from this event. +// +// This function is needed because `AddStateEvents` will not include a copy of +// the original event to save space, so you cannot use that slice alone. +// Instead, use this function which will add the original event if it is present +// in `AddsStateEventIDs`. +func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { + includeOutputEvent := false + for _, id := range ore.AddsStateEventIDs { + if id == ore.Event.EventID() { + includeOutputEvent = true + break + } + } + if !includeOutputEvent { + return ore.AddStateEvents + } + return append(ore.AddStateEvents, ore.Event) +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 1cf54144..3e5cae1b 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -2,6 +2,7 @@ package api import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type PerformJoinRequest struct { @@ -22,3 +23,31 @@ type PerformLeaveRequest struct { type PerformLeaveResponse struct { } + +// PerformBackfillRequest is a request to PerformBackfill. +type PerformBackfillRequest struct { + // The room to backfill + RoomID string `json:"room_id"` + // A map of backwards extremity event ID to a list of its prev_event IDs. + BackwardsExtremities map[string][]string `json:"backwards_extremities"` + // The maximum number of events to retrieve. + Limit int `json:"limit"` + // The server interested in the events. + ServerName gomatrixserverlib.ServerName `json:"server_name"` +} + +// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. +func (r *PerformBackfillRequest) PrevEventIDs() []string { + var prevEventIDs []string + for _, pes := range r.BackwardsExtremities { + prevEventIDs = append(prevEventIDs, pes...) + } + prevEventIDs = util.UniqueStrings(prevEventIDs) + return prevEventIDs +} + +// PerformBackfillResponse is a response to PerformBackfill. +type PerformBackfillResponse struct { + // Missing events, arbritrary order. + Events []gomatrixserverlib.HeaderedEvent `json:"events"` +} diff --git a/roomserver/api/query.go b/roomserver/api/query.go index c9a46ae9..b1525342 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -18,7 +18,6 @@ package api import ( "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" ) // QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState @@ -204,34 +203,6 @@ type QueryStateAndAuthChainResponse struct { AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` } -// QueryBackfillRequest is a request to QueryBackfill. -type QueryBackfillRequest struct { - // The room to backfill - RoomID string `json:"room_id"` - // A map of backwards extremity event ID to a list of its prev_event IDs. - BackwardsExtremities map[string][]string `json:"backwards_extremities"` - // The maximum number of events to retrieve. - Limit int `json:"limit"` - // The server interested in the events. - ServerName gomatrixserverlib.ServerName `json:"server_name"` -} - -// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. -func (r *QueryBackfillRequest) PrevEventIDs() []string { - var prevEventIDs []string - for _, pes := range r.BackwardsExtremities { - prevEventIDs = append(prevEventIDs, pes...) - } - prevEventIDs = util.UniqueStrings(prevEventIDs) - return prevEventIDs -} - -// QueryBackfillResponse is a response to QueryBackfill. -type QueryBackfillResponse struct { - // Missing events, arbritrary order. - Events []gomatrixserverlib.HeaderedEvent `json:"events"` -} - // QueryRoomVersionCapabilitiesRequest asks for the default room version type QueryRoomVersionCapabilitiesRequest struct{} diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index aea85ca9..e69307ad 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -19,6 +19,7 @@ package internal import ( "bytes" "context" + "fmt" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" @@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) TransactionID: u.transactionID, } - var stateEventNIDs []types.EventNID - for _, entry := range u.added { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.removed { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventRemoves { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventAdds { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs) + eventIDMap, err := u.stateEventMap() if err != nil { return nil, err } + for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore.SendAsServer = u.sendAsServer + // include extra state events if they were added as nearly every downstream component will care about it + // and we'd rather not have them all hit QueryEventsByID at the same time! + if len(ore.AddsStateEventIDs) > 0 { + ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs) + if err != nil { + return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) + } + } + return &api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, NewRoomEvent: &ore, }, nil } +// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being +// updated. +func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { + var extraEventIDs []string + for _, e := range eventIDs { + if e == u.event.EventID() { + continue + } + extraEventIDs = append(extraEventIDs, e) + } + if len(extraEventIDs) == 0 { + return nil, nil + } + extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs) + if err != nil { + return nil, err + } + var h []gomatrixserverlib.HeaderedEvent + for _, e := range extraEvents { + h = append(h, e.Headered(roomVersion)) + } + return h, nil +} + +// retrieve an event nid -> event ID map for all events that need updating +func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) { + var stateEventNIDs []types.EventNID + var allStateEntries []types.StateEntry + allStateEntries = append(allStateEntries, u.added...) + allStateEntries = append(allStateEntries, u.removed...) + allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...) + allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...) + for _, entry := range allStateEntries { + stateEventNIDs = append(stateEventNIDs, entry.EventNID) + } + stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] + return u.api.DB.EventIDs(u.ctx, stateEventNIDs) +} + type eventNIDSorter []types.EventNID func (s eventNIDSorter) Len() int { return len(s) } diff --git a/roomserver/internal/perform_backfill.go b/roomserver/internal/perform_backfill.go new file mode 100644 index 00000000..23ae9455 --- /dev/null +++ b/roomserver/internal/perform_backfill.go @@ -0,0 +1,305 @@ +package internal + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/auth" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +// backfillRequester implements gomatrixserverlib.BackfillRequester +type backfillRequester struct { + db storage.Database + fedClient *gomatrixserverlib.FederationClient + thisServer gomatrixserverlib.ServerName + bwExtrems map[string][]string + + // per-request state + servers []gomatrixserverlib.ServerName + eventIDToBeforeStateIDs map[string][]string + eventIDMap map[string]gomatrixserverlib.Event +} + +func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { + return &backfillRequester{ + db: db, + fedClient: fedClient, + thisServer: thisServer, + eventIDToBeforeStateIDs: make(map[string][]string), + eventIDMap: make(map[string]gomatrixserverlib.Event), + bwExtrems: bwExtrems, + } +} + +func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) { + b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap() + if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok { + return ids, nil + } + if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") { + util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room") + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{} + return nil, nil + } + // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event. + // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or + // we don't know the result of state res to merge forks (2 or more prev_events) + if len(targetEvent.PrevEventIDs()) == 1 { + prevEventID := targetEvent.PrevEventIDs()[0] + prevEvent, ok := b.eventIDMap[prevEventID] + if !ok { + goto FederationHit + } + prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID] + if !ok { + goto FederationHit + } + newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs) + if newStateIDs != nil { + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs, nil + } + // else we failed to calculate the new state, so fallthrough + } + +FederationHit: + var lastErr error + logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") + for _, srv := range b.servers { // hit any valid server + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fedClient, + RememberAuthEvents: false, + Server: srv, + } + res, err := c.StateIDsBeforeEvent(ctx, targetEvent) + if err != nil { + lastErr = err + continue + } + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res + return res, nil + } + return nil, lastErr +} + +func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string { + newStateIDs := prevEventStateIDs[:] + if prevEvent.StateKey() == nil { + // state is the same as the previous event + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs + } + + missingState := false // true if we are missing the info for a state event ID + foundEvent := false // true if we found a (type, state_key) match + // find which state ID to replace, if any + for i, id := range newStateIDs { + ev, ok := b.eventIDMap[id] + if !ok { + missingState = true + continue + } + // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself + if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && *ev.StateKey() == *prevEvent.StateKey() { + newStateIDs[i] = prevEvent.EventID() + foundEvent = true + break + } + } + if !foundEvent && !missingState { + // we can be certain that this is new state + newStateIDs = append(newStateIDs, prevEvent.EventID()) + foundEvent = true + } + + if foundEvent { + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs + } + return nil +} + +func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, + event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { + + // try to fetch the events from the database first + events, err := b.ProvideEvents(roomVer, eventIDs) + if err != nil { + // non-fatal, fallthrough + logrus.WithError(err).Info("Failed to fetch events") + } else { + logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs)) + if len(events) == len(eventIDs) { + result := make(map[string]*gomatrixserverlib.Event) + for i := range events { + result[events[i].EventID()] = &events[i] + b.eventIDMap[events[i].EventID()] = events[i] + } + return result, nil + } + } + + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fedClient, + RememberAuthEvents: false, + Server: b.servers[0], + } + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + return nil, err + } + for eventID, ev := range result { + b.eventIDMap[eventID] = *ev + } + return result, nil +} + +// ServersAtEvent is called when trying to determine which server to request from. +// It returns a list of servers which can be queried for backfill requests. These servers +// will be servers that are in the room already. The entries at the beginning are preferred servers +// and will be tried first. An empty list will fail the request. +func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { + // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use + // its successor, so look it up. + successor := "" +FindSuccessor: + for sucID, prevEventIDs := range b.bwExtrems { + for _, pe := range prevEventIDs { + if pe == eventID { + successor = sucID + break FindSuccessor + } + } + } + if successor == "" { + logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state") + return nil + } + eventID = successor + + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for + // the event is necessary. + NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") + return nil + } + + stateEntries, err := stateBeforeEvent(ctx, b.db, NIDs[eventID]) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") + return nil + } + + // possibly return all joined servers depending on history visiblity + memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries) + if err != nil { + logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") + return nil + } + logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis)) + + // Retrieve all "m.room.member" state events of "join" membership, which + // contains the list of users in the room before the event, therefore all + // the servers in it at that moment. + memberEvents, err := getMembershipsAtState(ctx, b.db, stateEntries, true) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") + return nil + } + memberEvents = append(memberEvents, memberEventsFromVis...) + + // Store the server names in a temporary map to avoid duplicates. + serverSet := make(map[gomatrixserverlib.ServerName]bool) + for _, event := range memberEvents { + serverSet[event.Origin()] = true + } + var servers []gomatrixserverlib.ServerName + for server := range serverSet { + if server == b.thisServer { + continue + } + servers = append(servers, server) + } + b.servers = servers + return servers +} + +// Backfill performs a backfill request to the given server. +// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid +func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, + fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { + + tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) + return &tx, err +} + +func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { + ctx := context.Background() + nidMap, err := b.db.EventNIDs(ctx, eventIDs) + if err != nil { + logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events") + return nil, err + } + eventNIDs := make([]types.EventNID, len(nidMap)) + i := 0 + for _, nid := range nidMap { + eventNIDs[i] = nid + i++ + } + eventsWithNids, err := b.db.Events(ctx, eventNIDs) + if err != nil { + logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events") + return nil, err + } + events := make([]gomatrixserverlib.Event, len(eventsWithNids)) + for i := range eventsWithNids { + events[i] = eventsWithNids[i].Event + } + return events, nil +} + +// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if the provided state indicated a 'shared' history visibility. +// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just +// pull all events and then filter by that table. +func joinEventsFromHistoryVisibility( + ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry) ([]types.Event, error) { + + var eventNIDs []types.EventNID + for _, entry := range stateEntries { + // Filter the events to retrieve to only keep the membership events + if entry.EventTypeNID == types.MRoomHistoryVisibilityNID && entry.EventStateKeyNID == types.EmptyStateKeyNID { + eventNIDs = append(eventNIDs, entry.EventNID) + break + } + } + + // Get all of the events in this state + stateEvents, err := db.Events(ctx, eventNIDs) + if err != nil { + return nil, err + } + events := make([]gomatrixserverlib.Event, len(stateEvents)) + for i := range stateEvents { + events[i] = stateEvents[i].Event + } + visibility := auth.HistoryVisibilityForRoom(events) + if visibility != "shared" { + logrus.Infof("ServersAtEvent history visibility not shared: %s", visibility) + return nil, nil + } + // get joined members + roomNID, err := db.RoomNID(ctx, roomID) + if err != nil { + return nil, err + } + joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) + if err != nil { + return nil, err + } + return db.Events(ctx, joinEventNIDs) +} diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index aea93388..375ddc23 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -441,11 +441,11 @@ func (r *RoomserverInternalAPI) QueryMissingEvents( return err } -// QueryBackfill implements api.RoomServerQueryAPI -func (r *RoomserverInternalAPI) QueryBackfill( +// PerformBackfill implements api.RoomServerQueryAPI +func (r *RoomserverInternalAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { // if we are requesting the backfill then we need to do a federation hit // TODO: we could be more sensible and fetch as many events we already have then request the rest @@ -489,7 +489,7 @@ func (r *RoomserverInternalAPI) QueryBackfill( return err } -func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error { +func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error { roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID) if err != nil { return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) @@ -647,7 +647,7 @@ func (r *RoomserverInternalAPI) scanEventTree( var pre string // TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be) - // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing + // Currently, callers like PerformBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing // so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in // duplicate events being sent in response to /backfill requests. initialIgnoreList := make(map[string]bool, len(visited)) diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go deleted file mode 100644 index 23ae9455..00000000 --- a/roomserver/internal/query_backfill.go +++ /dev/null @@ -1,305 +0,0 @@ -package internal - -import ( - "context" - - "github.com/matrix-org/dendrite/roomserver/auth" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/sirupsen/logrus" -) - -// backfillRequester implements gomatrixserverlib.BackfillRequester -type backfillRequester struct { - db storage.Database - fedClient *gomatrixserverlib.FederationClient - thisServer gomatrixserverlib.ServerName - bwExtrems map[string][]string - - // per-request state - servers []gomatrixserverlib.ServerName - eventIDToBeforeStateIDs map[string][]string - eventIDMap map[string]gomatrixserverlib.Event -} - -func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { - return &backfillRequester{ - db: db, - fedClient: fedClient, - thisServer: thisServer, - eventIDToBeforeStateIDs: make(map[string][]string), - eventIDMap: make(map[string]gomatrixserverlib.Event), - bwExtrems: bwExtrems, - } -} - -func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) { - b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap() - if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok { - return ids, nil - } - if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") { - util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room") - b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{} - return nil, nil - } - // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event. - // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or - // we don't know the result of state res to merge forks (2 or more prev_events) - if len(targetEvent.PrevEventIDs()) == 1 { - prevEventID := targetEvent.PrevEventIDs()[0] - prevEvent, ok := b.eventIDMap[prevEventID] - if !ok { - goto FederationHit - } - prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID] - if !ok { - goto FederationHit - } - newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs) - if newStateIDs != nil { - b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs - return newStateIDs, nil - } - // else we failed to calculate the new state, so fallthrough - } - -FederationHit: - var lastErr error - logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") - for _, srv := range b.servers { // hit any valid server - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, - RememberAuthEvents: false, - Server: srv, - } - res, err := c.StateIDsBeforeEvent(ctx, targetEvent) - if err != nil { - lastErr = err - continue - } - b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res - return res, nil - } - return nil, lastErr -} - -func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string { - newStateIDs := prevEventStateIDs[:] - if prevEvent.StateKey() == nil { - // state is the same as the previous event - b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs - return newStateIDs - } - - missingState := false // true if we are missing the info for a state event ID - foundEvent := false // true if we found a (type, state_key) match - // find which state ID to replace, if any - for i, id := range newStateIDs { - ev, ok := b.eventIDMap[id] - if !ok { - missingState = true - continue - } - // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself - if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && *ev.StateKey() == *prevEvent.StateKey() { - newStateIDs[i] = prevEvent.EventID() - foundEvent = true - break - } - } - if !foundEvent && !missingState { - // we can be certain that this is new state - newStateIDs = append(newStateIDs, prevEvent.EventID()) - foundEvent = true - } - - if foundEvent { - b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs - return newStateIDs - } - return nil -} - -func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, - event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { - - // try to fetch the events from the database first - events, err := b.ProvideEvents(roomVer, eventIDs) - if err != nil { - // non-fatal, fallthrough - logrus.WithError(err).Info("Failed to fetch events") - } else { - logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs)) - if len(events) == len(eventIDs) { - result := make(map[string]*gomatrixserverlib.Event) - for i := range events { - result[events[i].EventID()] = &events[i] - b.eventIDMap[events[i].EventID()] = events[i] - } - return result, nil - } - } - - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, - RememberAuthEvents: false, - Server: b.servers[0], - } - result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) - if err != nil { - return nil, err - } - for eventID, ev := range result { - b.eventIDMap[eventID] = *ev - } - return result, nil -} - -// ServersAtEvent is called when trying to determine which server to request from. -// It returns a list of servers which can be queried for backfill requests. These servers -// will be servers that are in the room already. The entries at the beginning are preferred servers -// and will be tried first. An empty list will fail the request. -func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { - // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use - // its successor, so look it up. - successor := "" -FindSuccessor: - for sucID, prevEventIDs := range b.bwExtrems { - for _, pe := range prevEventIDs { - if pe == eventID { - successor = sucID - break FindSuccessor - } - } - } - if successor == "" { - logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state") - return nil - } - eventID = successor - - // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for - // the event is necessary. - NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) - if err != nil { - logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") - return nil - } - - stateEntries, err := stateBeforeEvent(ctx, b.db, NIDs[eventID]) - if err != nil { - logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") - return nil - } - - // possibly return all joined servers depending on history visiblity - memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries) - if err != nil { - logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") - return nil - } - logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis)) - - // Retrieve all "m.room.member" state events of "join" membership, which - // contains the list of users in the room before the event, therefore all - // the servers in it at that moment. - memberEvents, err := getMembershipsAtState(ctx, b.db, stateEntries, true) - if err != nil { - logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") - return nil - } - memberEvents = append(memberEvents, memberEventsFromVis...) - - // Store the server names in a temporary map to avoid duplicates. - serverSet := make(map[gomatrixserverlib.ServerName]bool) - for _, event := range memberEvents { - serverSet[event.Origin()] = true - } - var servers []gomatrixserverlib.ServerName - for server := range serverSet { - if server == b.thisServer { - continue - } - servers = append(servers, server) - } - b.servers = servers - return servers -} - -// Backfill performs a backfill request to the given server. -// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid -func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, - fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { - - tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) - return &tx, err -} - -func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { - ctx := context.Background() - nidMap, err := b.db.EventNIDs(ctx, eventIDs) - if err != nil { - logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events") - return nil, err - } - eventNIDs := make([]types.EventNID, len(nidMap)) - i := 0 - for _, nid := range nidMap { - eventNIDs[i] = nid - i++ - } - eventsWithNids, err := b.db.Events(ctx, eventNIDs) - if err != nil { - logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events") - return nil, err - } - events := make([]gomatrixserverlib.Event, len(eventsWithNids)) - for i := range eventsWithNids { - events[i] = eventsWithNids[i].Event - } - return events, nil -} - -// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if the provided state indicated a 'shared' history visibility. -// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just -// pull all events and then filter by that table. -func joinEventsFromHistoryVisibility( - ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry) ([]types.Event, error) { - - var eventNIDs []types.EventNID - for _, entry := range stateEntries { - // Filter the events to retrieve to only keep the membership events - if entry.EventTypeNID == types.MRoomHistoryVisibilityNID && entry.EventStateKeyNID == types.EmptyStateKeyNID { - eventNIDs = append(eventNIDs, entry.EventNID) - break - } - } - - // Get all of the events in this state - stateEvents, err := db.Events(ctx, eventNIDs) - if err != nil { - return nil, err - } - events := make([]gomatrixserverlib.Event, len(stateEvents)) - for i := range stateEvents { - events[i] = stateEvents[i].Event - } - visibility := auth.HistoryVisibilityForRoom(events) - if visibility != "shared" { - logrus.Infof("ServersAtEvent history visibility not shared: %s", visibility) - return nil, nil - } - // get joined members - roomNID, err := db.RoomNID(ctx, roomID) - if err != nil { - return nil, err - } - joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) - if err != nil { - return nil, err - } - return db.Events(ctx, joinEventNIDs) -} diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 6f5e882e..1244300d 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -24,8 +24,9 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -36,7 +37,6 @@ const ( RoomserverQueryServerAllowedToSeeEventPath = "/roomserver/queryServerAllowedToSeeEvent" RoomserverQueryMissingEventsPath = "/roomserver/queryMissingEvents" RoomserverQueryStateAndAuthChainPath = "/roomserver/queryStateAndAuthChain" - RoomserverQueryBackfillPath = "/roomserver/queryBackfill" RoomserverQueryRoomVersionCapabilitiesPath = "/roomserver/queryRoomVersionCapabilities" RoomserverQueryRoomVersionForRoomPath = "/roomserver/queryRoomVersionForRoom" ) @@ -274,16 +274,16 @@ func (h *httpRoomserverInternalAPI) QueryStateAndAuthChain( return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -// QueryBackfill implements RoomServerQueryAPI -func (h *httpRoomserverInternalAPI) QueryBackfill( +// PerformBackfill implements RoomServerQueryAPI +func (h *httpRoomserverInternalAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBackfill") defer span.Finish() - apiURL := h.roomserverURL + RoomserverQueryBackfillPath + apiURL := h.roomserverURL + RoomserverPerformBackfillPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index 3a13ce37..8ac815f3 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -165,14 +165,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { }), ) internalAPIMux.Handle( - RoomserverQueryBackfillPath, - internal.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse { - var request api.QueryBackfillRequest - var response api.QueryBackfillResponse + RoomserverPerformBackfillPath, + internal.MakeInternalAPI("PerformBackfill", func(req *http.Request) util.JSONResponse { + var request api.PerformBackfillRequest + var response api.PerformBackfillResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.ErrorResponse(err) } - if err := r.QueryBackfill(req.Context(), &request, &response); err != nil { + if err := r.PerformBackfill(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response} -- cgit v1.2.3