aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/routing/messages.go')
-rw-r--r--syncapi/routing/messages.go108
1 files changed, 18 insertions, 90 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 53a9a963..c48414ab 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -285,7 +285,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling.
- events, err = r.backfill(backwardExtremities, r.limit)
+ events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
if err != nil {
return
}
@@ -334,7 +334,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit.
- pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents))
+ pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
if err != nil {
return
}
@@ -358,45 +358,29 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
-func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
- verReq := api.QueryRoomVersionForRoomRequest{RoomID: r.roomID}
- verRes := api.QueryRoomVersionForRoomResponse{}
- if err := r.queryAPI.QueryRoomVersionForRoom(r.ctx, &verReq, &verRes); err != nil {
- return nil, err
- }
-
- srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs)
- if err != nil {
- return nil, fmt.Errorf("Cannot find server to backfill from: %w", err)
- }
-
- headered := make([]gomatrixserverlib.HeaderedEvent, 0)
-
- // If the roomserver responded with at least one server that isn't us,
- // send it a request for backfill.
- util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
- txn, err := r.federation.Backfill(
- r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
- )
+func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
+ var res api.QueryBackfillResponse
+ err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
+ RoomID: roomID,
+ EarliestEventsIDs: fromEventIDs,
+ Limit: limit,
+ ServerName: r.cfg.Matrix.ServerName,
+ }, &res)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("QueryBackfill failed: %w", err)
}
+ util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
- for _, p := range txn.PDUs {
- event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion)
- if e != nil {
- continue
- }
- headered = append(headered, event.Headered(verRes.RoomVersion))
- }
- util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(headered)).Info("Storing new events from backfill")
+ // TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
+ // Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
+ // when you have multiple entry points to write events.
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
- for i := range headered {
+ for i := range res.Events {
if _, err = r.db.WriteEvent(
r.ctx,
- &headered[i],
+ &res.Events[i],
[]gomatrixserverlib.HeaderedEvent{},
[]string{},
[]string{},
@@ -406,63 +390,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
}
}
- return headered, nil
-}
-
-func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
- // Query the list of servers in the room when one of the backward extremities
- // was sent.
- var serversResponse api.QueryServersInRoomAtEventResponse
- serversRequest := api.QueryServersInRoomAtEventRequest{
- RoomID: r.roomID,
- EventID: fromEventIDs[0],
- }
- if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
- util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
- // FIXME: We shouldn't be doing this but in situations where we have already backfilled once
- // the query API doesn't work as backfilled events do not make it to the room server.
- // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
- // We need to inject backfilled events into the room server and store them appropriately.
- events, err := r.db.Events(r.ctx, fromEventIDs)
- if err != nil {
- return "", err
- }
- if len(events) == 0 {
- // should be impossible as these event IDs are backwards extremities
- return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
- }
- // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
- // We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
- for _, e := range events {
- _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
- if srverr != nil {
- util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
- continue
- }
- if srv != r.cfg.Matrix.ServerName {
- return srv, nil
- }
- }
- // no valid events which have a remote server, fail.
- return "", err
- }
-
- // Use the first server from the response, except if that server is us.
- // In that case, use the second one if the roomserver responded with
- // enough servers. If not, use an empty string to prevent the backfill
- // from happening as there's no server to direct the request towards.
- // TODO: Be smarter at selecting the server to direct the request
- // towards.
- srvToBackfillFrom := serversResponse.Servers[0]
- if srvToBackfillFrom == r.cfg.Matrix.ServerName {
- if len(serversResponse.Servers) > 1 {
- srvToBackfillFrom = serversResponse.Servers[1]
- } else {
- util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
- return "", nil
- }
- }
- return srvToBackfillFrom, nil
+ return res.Events, nil
}
// setToDefault returns the default value for the "to" query parameter of a