aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.sum6
-rw-r--r--roomserver/auth/auth.go70
-rw-r--r--roomserver/query/query.go89
-rw-r--r--syncapi/routing/messages.go176
-rw-r--r--syncapi/storage/postgres/backward_extremities_table.go58
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go1
-rw-r--r--syncapi/storage/postgres/syncserver.go21
-rw-r--r--syncapi/storage/sqlite3/backward_extremities_table.go72
-rw-r--r--syncapi/storage/sqlite3/syncserver.go17
-rw-r--r--syncapi/sync/requestpool.go13
-rw-r--r--sytest-whitelist1
11 files changed, 322 insertions, 202 deletions
diff --git a/go.sum b/go.sum
index 0f4542b2..48e0efd3 100644
--- a/go.sum
+++ b/go.sum
@@ -276,6 +276,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a h1:7+
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2 h1:y4DOMbhgPAnATHJ4lNxTWxIlJG0SlIPhvukx1sQkty4=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=
@@ -470,6 +472,8 @@ go.uber.org/atomic v1.3.0 h1:vs7fgriifsPbGdK3bNuMWapNn3qnZhCRXc19NRdq010=
go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
+go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -491,6 +495,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -559,6 +564,7 @@ golang.org/x/tools v0.0.0-20190910044552-dd2b5c81c578/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20190911230505-6bfd74cf029c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678 h1:rM1Udd0CgtYI3KUIhu9ROz0QCqjW+n/ODp/hH7c60Xc=
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/roomserver/auth/auth.go b/roomserver/auth/auth.go
index 5ff1fada..615a94b3 100644
--- a/roomserver/auth/auth.go
+++ b/roomserver/auth/auth.go
@@ -12,18 +12,76 @@
package auth
-import "github.com/matrix-org/gomatrixserverlib"
+import (
+ "encoding/json"
-// IsServerAllowed returns true if there exists a event in authEvents
-// which allows server to view this event. That is true when a client on the server
-// can view the event. Otherwise returns false.
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// TODO: This logic should live in gomatrixserverlib
+
+// IsServerAllowed returns true if the server is allowed to see events in the room
+// at this particular state. This function implements https://matrix.org/docs/spec/client_server/r0.6.0#id87
func IsServerAllowed(
serverName gomatrixserverlib.ServerName,
+ serverCurrentlyInRoom bool,
authEvents []gomatrixserverlib.Event,
) bool {
+ historyVisibility := historyVisibilityForRoom(authEvents)
+
+ // 1. If the history_visibility was set to world_readable, allow.
+ if historyVisibility == "world_readable" {
+ return true
+ }
+ // 2. If the user's membership was join, allow.
+ joinedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Join)
+ if joinedUserExists {
+ return true
+ }
+ // 3. If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow.
+ if historyVisibility == "shared" && serverCurrentlyInRoom {
+ return true
+ }
+ // 4. If the user's membership was invite, and the history_visibility was set to invited, allow.
+ invitedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Invite)
+ if invitedUserExists && historyVisibility == "invited" {
+ return true
+ }
+
+ // 5. Otherwise, deny.
+ return false
+}
+
+func historyVisibilityForRoom(authEvents []gomatrixserverlib.Event) string {
+ // https://matrix.org/docs/spec/client_server/r0.6.0#id87
+ // By default if no history_visibility is set, or if the value is not understood, the visibility is assumed to be shared.
+ visibility := "shared"
+ knownStates := []string{"invited", "joined", "shared", "world_readable"}
+ for _, ev := range authEvents {
+ if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility {
+ continue
+ }
+ // TODO: This should be HistoryVisibilityContent to match things like 'MemberContent'. Do this when moving to GMSL
+ content := struct {
+ HistoryVisibility string `json:"history_visibility"`
+ }{}
+ if err := json.Unmarshal(ev.Content(), &content); err != nil {
+ break // value is not understood
+ }
+ for _, s := range knownStates {
+ if s == content.HistoryVisibility {
+ visibility = s
+ break
+ }
+ }
+ }
+ return visibility
+}
+
+func IsAnyUserOnServerWithMembership(serverName gomatrixserverlib.ServerName, authEvents []gomatrixserverlib.Event, wantMembership string) bool {
for _, ev := range authEvents {
membership, err := ev.Membership()
- if err != nil || membership != gomatrixserverlib.Join {
+ if err != nil || membership != wantMembership {
continue
}
@@ -41,7 +99,5 @@ func IsServerAllowed(
return true
}
}
-
- // TODO: Check if history visibility is shared and if the server is currently in the room
return false
}
diff --git a/roomserver/query/query.go b/roomserver/query/query.go
index 2638919a..7fe9dc98 100644
--- a/roomserver/query/query.go
+++ b/roomserver/query/query.go
@@ -447,14 +447,26 @@ func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent(
request *api.QueryServerAllowedToSeeEventRequest,
response *api.QueryServerAllowedToSeeEventResponse,
) (err error) {
+ events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
+ if err != nil {
+ return
+ }
+ if len(events) == 0 {
+ response.AllowedToSeeEvent = false // event doesn't exist so not allowed to see
+ return
+ }
+ isServerInRoom, err := r.isServerCurrentlyInRoom(ctx, request.ServerName, events[0].RoomID())
+ if err != nil {
+ return
+ }
response.AllowedToSeeEvent, err = r.checkServerAllowedToSeeEvent(
- ctx, request.EventID, request.ServerName,
+ ctx, request.EventID, request.ServerName, isServerInRoom,
)
return
}
func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
- ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName,
+ ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
) (bool, error) {
roomState := state.NewStateResolution(r.DB)
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
@@ -469,7 +481,7 @@ func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
return false, err
}
- return auth.IsServerAllowed(serverName, stateAtEvent), nil
+ return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
}
// QueryMissingEvents implements api.RoomserverQueryAPI
@@ -564,17 +576,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err
}
+func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
+ roomNID, err := r.DB.RoomNID(ctx, roomID)
+ if err != nil {
+ return false, err
+ }
+
+ eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true)
+ if err != nil {
+ return false, err
+ }
+
+ events, err := r.DB.Events(ctx, eventNIDs)
+ if err != nil {
+ return false, err
+ }
+ gmslEvents := make([]gomatrixserverlib.Event, len(events))
+ for i := range events {
+ gmslEvents[i] = events[i].Event
+ }
+ return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil
+}
+
+// TODO: Remove this when we have tests to assert correctness of this function
+// nolint:gocyclo
func (r *RoomserverQueryAPI) scanEventTree(
ctx context.Context, front []string, visited map[string]bool, limit int,
serverName gomatrixserverlib.ServerName,
-) (resultNIDs []types.EventNID, err error) {
+) ([]types.EventNID, error) {
+ var resultNIDs []types.EventNID
+ var err error
var allowed bool
var events []types.Event
var next []string
var pre string
+ // TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
+ // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
+ // so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
+ // duplicate events being sent in response to /backfill requests.
+ initialIgnoreList := make(map[string]bool, len(visited))
+ for k, v := range visited {
+ initialIgnoreList[k] = v
+ }
+
resultNIDs = make([]types.EventNID, 0, limit)
+ var checkedServerInRoom bool
+ var isServerInRoom bool
+
// Loop through the event IDs to retrieve the requested events and go
// through the whole tree (up to the provided limit) using the events'
// "prev_event" key.
@@ -587,7 +637,18 @@ BFSLoop:
// Retrieve the events to process from the database.
events, err = r.DB.EventsFromIDs(ctx, front)
if err != nil {
- return
+ return resultNIDs, err
+ }
+
+ if !checkedServerInRoom && len(events) > 0 {
+ // It's nasty that we have to extract the room ID from an event, but many federation requests
+ // only talk in event IDs, no room IDs at all (!!!)
+ ev := events[0]
+ isServerInRoom, err = r.isServerCurrentlyInRoom(ctx, serverName, ev.RoomID())
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to check if server is currently in room, assuming not.")
+ }
+ checkedServerInRoom = true
}
for _, ev := range events {
@@ -595,17 +656,23 @@ BFSLoop:
if len(resultNIDs) == limit {
break BFSLoop
}
- // Update the list of events to retrieve.
- resultNIDs = append(resultNIDs, ev.EventNID)
+
+ if !initialIgnoreList[ev.EventID()] {
+ // Update the list of events to retrieve.
+ resultNIDs = append(resultNIDs, ev.EventNID)
+ }
// Loop through the event's parents.
for _, pre = range ev.PrevEventIDs() {
// Only add an event to the list of next events to process if it
// hasn't been seen before.
if !visited[pre] {
visited[pre] = true
- allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName)
+ allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName, isServerInRoom)
if err != nil {
- return
+ util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).WithError(err).Error(
+ "Error checking if allowed to see event",
+ )
+ return resultNIDs, err
}
// If the event hasn't been seen before and the HS
@@ -613,6 +680,8 @@ BFSLoop:
// the list of events to retrieve.
if allowed {
next = append(next, pre)
+ } else {
+ util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).Info("Not allowed to see event")
}
}
}
@@ -621,7 +690,7 @@ BFSLoop:
front = next
}
- return
+ return resultNIDs, err
}
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index cd21c3bd..5f2e4f17 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -28,7 +28,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
type messagesReq struct {
@@ -151,6 +151,14 @@ func OnIncomingMessagesRequest(
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
return jsonerror.InternalServerError()
}
+ util.GetLogger(req.Context()).WithFields(logrus.Fields{
+ "from": from.String(),
+ "to": to.String(),
+ "limit": limit,
+ "backwards": backwardOrdering,
+ "return_start": start.String(),
+ "return_end": end.String(),
+ }).Info("Responding")
// Respond with the events.
return util.JSONResponse{
@@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
- isSetLargeEnough := true
- if len(streamEvents) < r.limit {
+ isSetLargeEnough := len(streamEvents) >= r.limit
+ if !isSetLargeEnough {
+ // it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering {
if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so
@@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
return
}
-// containsBackwardExtremity checks if a slice of StreamEvent contains a
-// backward extremity. It does so by selecting the earliest event in the slice
-// and by checking the presence in the database of all of its parent events, and
-// considers the event itself a backward extremity if at least one of the parent
-// events doesn't exist in the database.
-// Returns an error if there was an issue with talking to the database.
-//
-// This function is unused but currently set to nolint for now until we are
-// absolutely sure that the changes in matrix-org/dendrite#847 are behaving
-// properly.
-// nolint:unused
-func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) {
- // Select the earliest retrieved event.
- var ev *types.StreamEvent
- if r.backwardOrdering {
- ev = &(events[len(events)-1])
- } else {
- ev = &(events[0])
- }
- // Get the earliest retrieved event's parents.
- prevIDs := ev.PrevEventIDs()
- prevs, err := r.db.Events(r.ctx, prevIDs)
- if err != nil {
- return false, nil
- }
- // Check if we have all of the events we requested. If not, it means we've
- // reached a backward extremity.
- var eventInDB bool
- var id string
- // Iterate over the IDs we used in the request.
- for _, id = range prevIDs {
- eventInDB = false
- // Iterate over the events we got in response.
- for _, ev := range prevs {
- if ev.EventID() == id {
- eventInDB = true
- }
- }
- // One occurrence of one the event's parents not being present in the
- // database is enough to say that the event is a backward extremity.
- if !eventInDB {
- return true, nil
- }
- }
-
- return false, nil
-}
-
// backfill performs a backfill request over the federation on another
// homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
@@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
+ srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs)
+ if err != nil {
+ return nil, fmt.Errorf("Cannot find server to backfill from: %w", err)
+ }
+
+ pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
+
+ // If the roomserver responded with at least one server that isn't us,
+ // send it a request for backfill.
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
+ txn, err := r.federation.Backfill(
+ r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ for _, p := range txn.PDUs {
+ pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
+ }
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
+
+ // Store the events in the database, while marking them as unfit to show
+ // up in responses to sync requests.
+ for _, pdu := range pdus {
+ headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
+ if _, err = r.db.WriteEvent(
+ r.ctx,
+ &headered,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{},
+ []string{},
+ nil, true,
+ ); err != nil {
+ return nil, err
+ }
+ }
+
+ return pdus, nil
+}
+
+func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
@@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
- return nil, err
+ util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
+ // FIXME: We shouldn't be doing this but in situations where we have already backfilled once
+ // the query API doesn't work as backfilled events do not make it to the room server.
+ // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
+ // We need to inject backfilled events into the room server and store them appropriately.
+ events, err := r.db.Events(r.ctx, fromEventIDs)
+ if err != nil {
+ return "", err
+ }
+ if len(events) == 0 {
+ // should be impossible as these event IDs are backwards extremities
+ return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
+ }
+ // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
+ // We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
+ for _, e := range events {
+ _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
+ if srverr != nil {
+ util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
+ continue
+ }
+ if srv != r.cfg.Matrix.ServerName {
+ return srv, nil
+ }
+ }
+ // no valid events which have a remote server, fail.
+ return "", err
}
// Use the first server from the response, except if that server is us.
@@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
- srvToBackfillFrom = gomatrixserverlib.ServerName("")
- log.Warn("Not enough servers to backfill from")
- }
- }
-
- pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
-
- // If the roomserver responded with at least one server that isn't us,
- // send it a request for backfill.
- if len(srvToBackfillFrom) > 0 {
- txn, err := r.federation.Backfill(
- r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
- )
- if err != nil {
- return nil, err
- }
-
- for _, p := range txn.PDUs {
- pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
- }
-
- // Store the events in the database, while marking them as unfit to show
- // up in responses to sync requests.
- for _, pdu := range pdus {
- headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
- if _, err = r.db.WriteEvent(
- r.ctx,
- &headered,
- []gomatrixserverlib.HeaderedEvent{},
- []string{},
- []string{},
- nil, true,
- ); err != nil {
- return nil, err
- }
+ util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
+ return "", nil
}
}
-
- return pdus, nil
+ return srvToBackfillFrom, nil
}
// setToDefault returns the default value for the "to" query parameter of a
@@ -475,7 +470,8 @@ func setToDefault(
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
- to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0)
+ // go 1 earlier than the first event so we correctly fetch the earliest event
+ to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)
diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go
index 8286ca43..b3ee28e0 100644
--- a/syncapi/storage/postgres/backward_extremities_table.go
+++ b/syncapi/storage/postgres/backward_extremities_table.go
@@ -21,39 +21,53 @@ import (
"github.com/matrix-org/dendrite/common"
)
+// The purpose of this table is to keep track of backwards extremities for a room.
+// Backwards extremities are the earliest (DAG-wise) known events which we have
+// the entire event JSON. These event IDs are used in federation requests to fetch
+// even earlier events.
+//
+// We persist the previous event IDs as well, one per row, so when we do fetch even
+// earlier events we can simply delete rows which referenced it. Consider the graph:
+// A
+// | Event C has 1 prev_event ID: A.
+// B C
+// |___| Event D has 2 prev_event IDs: B and C.
+// |
+// D
+// The earliest known event we have is D, so this table has 2 rows.
+// A backfill request gives us C but not B. We delete rows where prev_event=C. This
+// still means that D is a backwards extremity as we do not have event B. However, event
+// C is *also* a backwards extremity at this point as we do not have event A. Later,
+// when we fetch event B, we delete rows where prev_event=B. This then removes D as
+// a backwards extremity because there are no more rows with event_id=B.
const backwardExtremitiesSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
- -- The event ID for the event.
+ -- The event ID for the last known event. This is the backwards extremity.
event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
- PRIMARY KEY(room_id, event_id)
+ PRIMARY KEY(room_id, event_id, prev_event_id)
);
`
const insertBackwardExtremitySQL = "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
- " VALUES ($1, $2)" +
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
" ON CONFLICT DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-const isBackwardExtremitySQL = "" +
- "SELECT EXISTS (" +
- " SELECT TRUE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2" +
- ")"
-
const deleteBackwardExtremitySQL = "" +
- "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2"
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
- isBackwardExtremityStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
}
@@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
return
}
- if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
- return
- }
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return
}
@@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
}
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
- ctx context.Context, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
) (err error) {
- _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
return
}
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (eventIDs []string, err error) {
- eventIDs = make([]string, 0)
-
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
@@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
return eventIDs, rows.Err()
}
-func (s *backwardExtremitiesStatements) isBackwardExtremity(
- ctx context.Context, roomID, eventID string,
-) (isBE bool, err error) {
- err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
- return
-}
-
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
- ctx context.Context, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
) (err error) {
- _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 5f9a1d0c..0b53dfa9 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -305,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
-
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index ccf1c565..f3f1aabc 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -111,22 +111,17 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
-func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
- // If the event is already known as a backward extremity, don't consider
- // it as such anymore now that we have it.
- isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
- if err != nil {
+// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
+// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
+// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
+func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
+ if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
- if isBackwardExtremity {
- if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
- }
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
- prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
+ prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs())
if err != nil {
return err
}
@@ -141,7 +136,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev
// If the event is missing, consider it a backward extremity.
if !found {
- if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
+ if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
return err
}
}
@@ -174,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent(
return err
}
- if err = d.handleBackwardExtremities(ctx, ev); err != nil {
+ if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
return err
}
diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go
index fcf15da2..0663e2a1 100644
--- a/syncapi/storage/sqlite3/backward_extremities_table.go
+++ b/syncapi/storage/sqlite3/backward_extremities_table.go
@@ -21,38 +21,53 @@ import (
"github.com/matrix-org/dendrite/common"
)
+// The purpose of this table is to keep track of backwards extremities for a room.
+// Backwards extremities are the earliest (DAG-wise) known events which we have
+// the entire event JSON. These event IDs are used in federation requests to fetch
+// even earlier events.
+//
+// We persist the previous event IDs as well, one per row, so when we do fetch even
+// earlier events we can simply delete rows which referenced it. Consider the graph:
+// A
+// | Event C has 1 prev_event ID: A.
+// B C
+// |___| Event D has 2 prev_event IDs: B and C.
+// |
+// D
+// The earliest known event we have is D, so this table has 2 rows.
+// A backfill request gives us C but not B. We delete rows where prev_event=C. This
+// still means that D is a backwards extremity as we do not have event B. However, event
+// C is *also* a backwards extremity at this point as we do not have event A. Later,
+// when we fetch event B, we delete rows where prev_event=B. This then removes D as
+// a backwards extremity because there are no more rows with event_id=B.
const backwardExtremitiesSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
+ -- The 'room_id' key for the event.
room_id TEXT NOT NULL,
+ -- The event ID for the last known event. This is the backwards extremity.
event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
- PRIMARY KEY(room_id, event_id)
+ PRIMARY KEY(room_id, event_id, prev_event_id)
);
`
const insertBackwardExtremitySQL = "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
- " VALUES ($1, $2)" +
- " ON CONFLICT (room_id, event_id) DO NOTHING"
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-const isBackwardExtremitySQL = "" +
- "SELECT EXISTS (" +
- " SELECT TRUE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2" +
- ")"
-
const deleteBackwardExtremitySQL = "" +
- "DELETE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2"
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
- isBackwardExtremityStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
}
@@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
return
}
- if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
- return
- }
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return
}
@@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
}
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
) (err error) {
- stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt)
- _, err = stmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
return
}
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
- ctx context.Context, txn *sql.Tx, roomID string,
+ ctx context.Context, roomID string,
) (eventIDs []string, err error) {
- eventIDs = make([]string, 0)
-
- stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt)
- rows, err := stmt.QueryContext(ctx, roomID)
+ rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
+ defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
for rows.Next() {
var eID string
@@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
eventIDs = append(eventIDs, eID)
}
- return
-}
-
-func (s *backwardExtremitiesStatements) isBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
-) (isBE bool, err error) {
- stmt := common.TxStmt(txn, s.isBackwardExtremityStmt)
- err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
- return
+ return eventIDs, rows.Err()
}
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
) (err error) {
- stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt)
- _, err = stmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index a06fc91f..8ff18900 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
+// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
+// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
+// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
- // If the event is already known as a backward extremity, don't consider
- // it as such anymore now that we have it.
- isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
- if err != nil {
+ if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
- if isBackwardExtremity {
- if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
- }
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
@@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// If the event is missing, consider it a backward extremity.
if !found {
- if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
+ if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
return err
}
}
@@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities []string, err error) {
- return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID)
+ return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
}
// MaxTopologicalPosition returns the highest topological position for a given
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index b4ccbd27..69efd8aa 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
var syncData *types.Response
// Extract values from request
- logger := util.GetLogger(req.Context())
userID := device.UserID
syncReq, err := newSyncRequest(req, *device)
if err != nil {
@@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
JSON: jsonerror.Unknown(err.Error()),
}
}
- logger.WithFields(log.Fields{
+ logger := util.GetLogger(req.Context()).WithFields(log.Fields{
"userID": userID,
"since": syncReq.since,
"timeout": syncReq.timeout,
- }).Info("Incoming /sync request")
+ })
currPos := rp.notifier.CurrentPosition()
if shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
+ logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
+ logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
@@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
- util.GetLogger(req.Context()).WithError(err).Error("request cancelled")
+ logger.WithError(err).Error("request cancelled")
return jsonerror.InternalServerError()
}
@@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
+ logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
if !syncData.IsEmpty() || hasTimedOut {
+ logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
diff --git a/sytest-whitelist b/sytest-whitelist
index 03e11b83..95b000e1 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -220,3 +220,4 @@ Regular users can add and delete aliases when m.room.aliases is restricted
GET /r0/capabilities is not public
GET /joined_rooms lists newly-created room
/joined_rooms returns only joined rooms
+Message history can be paginated over federation