diff options
author | Kegsay <kegan@matrix.org> | 2020-03-24 12:20:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-24 12:20:10 +0000 |
commit | 6bac7e5efddea05aa68a56e44423d2bc157ec364 (patch) | |
tree | c12cbf607cfbe9d7cc69bcfcbc58eaca2f8ab3e4 /syncapi/routing | |
parent | 5a1a1ded1b5d8387f89e3d586729d175c1f1a1d0 (diff) |
Implement backfill over federation (#938)
* Implement history visibility checks for /backfill
Required for p2p to show history correctly.
* Add sytest
* Logging
* Fix two backfill bugs which prevented backfill from working correctly
- When receiving backfill requests, do not send the event that was in the original request.
- When storing backfill results, correctly update the backwards extremity for the room.
* hack: make backfill work multiple times
* add sqlite impl and remove logging
* Linting
Diffstat (limited to 'syncapi/routing')
-rw-r--r-- | syncapi/routing/messages.go | 176 |
1 files changed, 86 insertions, 90 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index cd21c3bd..5f2e4f17 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -28,7 +28,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) type messagesReq struct { @@ -151,6 +151,14 @@ func OnIncomingMessagesRequest( util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed") return jsonerror.InternalServerError() } + util.GetLogger(req.Context()).WithFields(logrus.Fields{ + "from": from.String(), + "to": to.String(), + "limit": limit, + "backwards": backwardOrdering, + "return_start": start.String(), + "return_end": end.String(), + }).Info("Responding") // Respond with the events. return util.JSONResponse{ @@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent events []gomatrixserverlib.HeaderedEvent, err error, ) { // Check if we have enough events. - isSetLargeEnough := true - if len(streamEvents) < r.limit { + isSetLargeEnough := len(streamEvents) >= r.limit + if !isSetLargeEnough { + // it might be fine we don't have up to 'limit' events, let's find out if r.backwardOrdering { if r.wasToProvided { // The condition in the SQL query is a strict "greater than" so @@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent return } -// containsBackwardExtremity checks if a slice of StreamEvent contains a -// backward extremity. It does so by selecting the earliest event in the slice -// and by checking the presence in the database of all of its parent events, and -// considers the event itself a backward extremity if at least one of the parent -// events doesn't exist in the database. -// Returns an error if there was an issue with talking to the database. -// -// This function is unused but currently set to nolint for now until we are -// absolutely sure that the changes in matrix-org/dendrite#847 are behaving -// properly. -// nolint:unused -func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) { - // Select the earliest retrieved event. - var ev *types.StreamEvent - if r.backwardOrdering { - ev = &(events[len(events)-1]) - } else { - ev = &(events[0]) - } - // Get the earliest retrieved event's parents. - prevIDs := ev.PrevEventIDs() - prevs, err := r.db.Events(r.ctx, prevIDs) - if err != nil { - return false, nil - } - // Check if we have all of the events we requested. If not, it means we've - // reached a backward extremity. - var eventInDB bool - var id string - // Iterate over the IDs we used in the request. - for _, id = range prevIDs { - eventInDB = false - // Iterate over the events we got in response. - for _, ev := range prevs { - if ev.EventID() == id { - eventInDB = true - } - } - // One occurrence of one the event's parents not being present in the - // database is enough to say that the event is a backward extremity. - if !eventInDB { - return true, nil - } - } - - return false, nil -} - // backfill performs a backfill request over the federation on another // homeserver in the room. // See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid @@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { + srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) + if err != nil { + return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) + } + + pdus := make([]gomatrixserverlib.HeaderedEvent, 0) + + // If the roomserver responded with at least one server that isn't us, + // send it a request for backfill. + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server") + txn, err := r.federation.Backfill( + r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, + ) + if err != nil { + return nil, err + } + + for _, p := range txn.PDUs { + pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) + } + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill") + + // Store the events in the database, while marking them as unfit to show + // up in responses to sync requests. + for _, pdu := range pdus { + headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) + if _, err = r.db.WriteEvent( + r.ctx, + &headered, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, + []string{}, + nil, true, + ); err != nil { + return nil, err + } + } + + return pdus, nil +} + +func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) { // Query the list of servers in the room when one of the backward extremities // was sent. var serversResponse api.QueryServersInRoomAtEventResponse @@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv EventID: fromEventIDs[0], } if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { - return nil, err + util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender") + // FIXME: We shouldn't be doing this but in situations where we have already backfilled once + // the query API doesn't work as backfilled events do not make it to the room server. + // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question. + // We need to inject backfilled events into the room server and store them appropriately. + events, err := r.db.Events(r.ctx, fromEventIDs) + if err != nil { + return "", err + } + if len(events) == 0 { + // should be impossible as these event IDs are backwards extremities + return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs) + } + // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it. + // We shouldn't be doing this really, but as a heuristic it should work pretty well for now. + for _, e := range events { + _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender()) + if srverr != nil { + util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender") + continue + } + if srv != r.cfg.Matrix.ServerName { + return srv, nil + } + } + // no valid events which have a remote server, fail. + return "", err } // Use the first server from the response, except if that server is us. @@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv if len(serversResponse.Servers) > 1 { srvToBackfillFrom = serversResponse.Servers[1] } else { - srvToBackfillFrom = gomatrixserverlib.ServerName("") - log.Warn("Not enough servers to backfill from") - } - } - - pdus := make([]gomatrixserverlib.HeaderedEvent, 0) - - // If the roomserver responded with at least one server that isn't us, - // send it a request for backfill. - if len(srvToBackfillFrom) > 0 { - txn, err := r.federation.Backfill( - r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, - ) - if err != nil { - return nil, err - } - - for _, p := range txn.PDUs { - pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) - } - - // Store the events in the database, while marking them as unfit to show - // up in responses to sync requests. - for _, pdu := range pdus { - headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) - if _, err = r.db.WriteEvent( - r.ctx, - &headered, - []gomatrixserverlib.HeaderedEvent{}, - []string{}, - []string{}, - nil, true, - ); err != nil { - return nil, err - } + util.GetLogger(r.ctx).Info("Not enough servers to backfill from") + return "", nil } } - - return pdus, nil + return srvToBackfillFrom, nil } // setToDefault returns the default value for the "to" query parameter of a @@ -475,7 +470,8 @@ func setToDefault( roomID string, ) (to *types.PaginationToken, err error) { if backwardOrdering { - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0) + // go 1 earlier than the first event so we correctly fetch the earliest event + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) } else { var pos types.StreamPosition pos, err = db.MaxTopologicalPosition(ctx, roomID) |