aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-03-24 12:20:10 +0000
committerGitHub <noreply@github.com>2020-03-24 12:20:10 +0000
commit6bac7e5efddea05aa68a56e44423d2bc157ec364 (patch)
treec12cbf607cfbe9d7cc69bcfcbc58eaca2f8ab3e4 /syncapi/routing
parent5a1a1ded1b5d8387f89e3d586729d175c1f1a1d0 (diff)
Implement backfill over federation (#938)
* Implement history visibility checks for /backfill Required for p2p to show history correctly. * Add sytest * Logging * Fix two backfill bugs which prevented backfill from working correctly - When receiving backfill requests, do not send the event that was in the original request. - When storing backfill results, correctly update the backwards extremity for the room. * hack: make backfill work multiple times * add sqlite impl and remove logging * Linting
Diffstat (limited to 'syncapi/routing')
-rw-r--r--syncapi/routing/messages.go176
1 files changed, 86 insertions, 90 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index cd21c3bd..5f2e4f17 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -28,7 +28,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
type messagesReq struct {
@@ -151,6 +151,14 @@ func OnIncomingMessagesRequest(
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
return jsonerror.InternalServerError()
}
+ util.GetLogger(req.Context()).WithFields(logrus.Fields{
+ "from": from.String(),
+ "to": to.String(),
+ "limit": limit,
+ "backwards": backwardOrdering,
+ "return_start": start.String(),
+ "return_end": end.String(),
+ }).Info("Responding")
// Respond with the events.
return util.JSONResponse{
@@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
- isSetLargeEnough := true
- if len(streamEvents) < r.limit {
+ isSetLargeEnough := len(streamEvents) >= r.limit
+ if !isSetLargeEnough {
+ // it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering {
if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so
@@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
return
}
-// containsBackwardExtremity checks if a slice of StreamEvent contains a
-// backward extremity. It does so by selecting the earliest event in the slice
-// and by checking the presence in the database of all of its parent events, and
-// considers the event itself a backward extremity if at least one of the parent
-// events doesn't exist in the database.
-// Returns an error if there was an issue with talking to the database.
-//
-// This function is unused but currently set to nolint for now until we are
-// absolutely sure that the changes in matrix-org/dendrite#847 are behaving
-// properly.
-// nolint:unused
-func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) {
- // Select the earliest retrieved event.
- var ev *types.StreamEvent
- if r.backwardOrdering {
- ev = &(events[len(events)-1])
- } else {
- ev = &(events[0])
- }
- // Get the earliest retrieved event's parents.
- prevIDs := ev.PrevEventIDs()
- prevs, err := r.db.Events(r.ctx, prevIDs)
- if err != nil {
- return false, nil
- }
- // Check if we have all of the events we requested. If not, it means we've
- // reached a backward extremity.
- var eventInDB bool
- var id string
- // Iterate over the IDs we used in the request.
- for _, id = range prevIDs {
- eventInDB = false
- // Iterate over the events we got in response.
- for _, ev := range prevs {
- if ev.EventID() == id {
- eventInDB = true
- }
- }
- // One occurrence of one the event's parents not being present in the
- // database is enough to say that the event is a backward extremity.
- if !eventInDB {
- return true, nil
- }
- }
-
- return false, nil
-}
-
// backfill performs a backfill request over the federation on another
// homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
@@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
+ srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs)
+ if err != nil {
+ return nil, fmt.Errorf("Cannot find server to backfill from: %w", err)
+ }
+
+ pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
+
+ // If the roomserver responded with at least one server that isn't us,
+ // send it a request for backfill.
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
+ txn, err := r.federation.Backfill(
+ r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ for _, p := range txn.PDUs {
+ pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
+ }
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
+
+ // Store the events in the database, while marking them as unfit to show
+ // up in responses to sync requests.
+ for _, pdu := range pdus {
+ headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
+ if _, err = r.db.WriteEvent(
+ r.ctx,
+ &headered,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{},
+ []string{},
+ nil, true,
+ ); err != nil {
+ return nil, err
+ }
+ }
+
+ return pdus, nil
+}
+
+func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
@@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
- return nil, err
+ util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
+ // FIXME: We shouldn't be doing this but in situations where we have already backfilled once
+ // the query API doesn't work as backfilled events do not make it to the room server.
+ // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
+ // We need to inject backfilled events into the room server and store them appropriately.
+ events, err := r.db.Events(r.ctx, fromEventIDs)
+ if err != nil {
+ return "", err
+ }
+ if len(events) == 0 {
+ // should be impossible as these event IDs are backwards extremities
+ return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
+ }
+ // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
+ // We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
+ for _, e := range events {
+ _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
+ if srverr != nil {
+ util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
+ continue
+ }
+ if srv != r.cfg.Matrix.ServerName {
+ return srv, nil
+ }
+ }
+ // no valid events which have a remote server, fail.
+ return "", err
}
// Use the first server from the response, except if that server is us.
@@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
- srvToBackfillFrom = gomatrixserverlib.ServerName("")
- log.Warn("Not enough servers to backfill from")
- }
- }
-
- pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
-
- // If the roomserver responded with at least one server that isn't us,
- // send it a request for backfill.
- if len(srvToBackfillFrom) > 0 {
- txn, err := r.federation.Backfill(
- r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
- )
- if err != nil {
- return nil, err
- }
-
- for _, p := range txn.PDUs {
- pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
- }
-
- // Store the events in the database, while marking them as unfit to show
- // up in responses to sync requests.
- for _, pdu := range pdus {
- headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
- if _, err = r.db.WriteEvent(
- r.ctx,
- &headered,
- []gomatrixserverlib.HeaderedEvent{},
- []string{},
- []string{},
- nil, true,
- ); err != nil {
- return nil, err
- }
+ util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
+ return "", nil
}
}
-
- return pdus, nil
+ return srvToBackfillFrom, nil
}
// setToDefault returns the default value for the "to" query parameter of a
@@ -475,7 +470,8 @@ func setToDefault(
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
- to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0)
+ // go 1 earlier than the first event so we correctly fetch the earliest event
+ to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)