aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-08-25 14:42:47 +0200
committerGitHub <noreply@github.com>2022-08-25 13:42:47 +0100
commit07dd9bd9954d0740664651bf9755a81c3ba2d011 (patch)
tree4a547a8c575767efd66c4e6c0b326ded2f151a4d /syncapi
parent8ff3f1a7c9578a3a4f95755c4698da3219777097 (diff)
SyncAPI tweaks/fixes (#2671)
- Reverts 9dc57122d991d54ea6750448ba88c8763a569830 as it was causing issues https://github.com/matrix-org/dendrite/issues/2660 - Updates the GMSL `DefaultStateFilter` to use a limit of 20 events - Uses the timeline events to determine the new position instead of the state events
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/storage/interface.go3
-rw-r--r--syncapi/storage/shared/syncserver.go34
-rw-r--r--syncapi/streams/stream_pdu.go57
-rw-r--r--syncapi/sync/request.go28
-rw-r--r--syncapi/sync/requestpool.go2
-rw-r--r--syncapi/syncapi_test.go2
-rw-r--r--syncapi/types/types.go1
7 files changed, 61 insertions, 66 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 43a75da9..0c8ba4e3 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -19,10 +19,11 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/gomatrixserverlib"
+
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index a46e5525..b06d2c6a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -20,15 +20,18 @@ import (
"encoding/json"
"fmt"
+ "github.com/tidwall/gjson"
+
userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/sirupsen/logrus"
)
// Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite
@@ -683,7 +686,7 @@ func (d *Database) GetStateDeltas(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
-) ([]types.StateDelta, []string, error) {
+) (deltas []types.StateDelta, joinedRoomsIDs []string, err error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@@ -718,8 +721,6 @@ func (d *Database) GetStateDeltas(
}
}
- var deltas []types.StateDelta
-
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter, allRoomIDs)
if err != nil {
@@ -767,15 +768,11 @@ func (d *Database) GetStateDeltas(
}
// handle newly joined rooms and non-joined rooms
+ newlyJoinedRooms := make(map[string]bool, len(state))
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
- // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
- // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
- // dupe join events will result in the entire room state coming down to the client again. This is added in
- // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
- // the timeline.
- if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
- if membership == gomatrixserverlib.Join {
+ if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" {
+ if membership == gomatrixserverlib.Join && prevMembership != membership {
// send full room state down instead of a delta
var s []types.StreamEvent
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
@@ -786,6 +783,7 @@ func (d *Database) GetStateDeltas(
return nil, nil, err
}
state[roomID] = s
+ newlyJoinedRooms[roomID] = true
continue // we'll add this room in when we do joined rooms
}
@@ -806,6 +804,7 @@ func (d *Database) GetStateDeltas(
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
RoomID: joinedRoomID,
+ NewlyJoined: newlyJoinedRooms[joinedRoomID],
})
}
@@ -892,7 +891,7 @@ func (d *Database) GetStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
- if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
+ if membership, _ := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas[roomID] = types.StateDelta{
Membership: membership,
@@ -1003,15 +1002,16 @@ func (d *Database) CleanSendToDeviceUpdates(
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
-func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
+func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) (string, string) {
if ev.Type() != "m.room.member" || !ev.StateKeyEquals(userID) {
- return ""
+ return "", ""
}
membership, err := ev.Membership()
if err != nil {
- return ""
+ return "", ""
}
- return membership
+ prevMembership := gjson.GetBytes(ev.Unsigned(), "prev_content.membership").Str
+ return membership, prevMembership
}
// StoreReceipt stores user receipts
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 2818aad8..fa4c722c 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -209,11 +209,27 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
+ newRange := r
+ // If this room was joined in this sync, try to fetch
+ // as much timeline events as allowed by the filter.
+ if delta.NewlyJoined {
+ // Reverse the range, so we get the most recent first.
+ // This will be limited by the eventFilter.
+ newRange = types.Range{
+ From: r.To,
+ To: 0,
+ Backwards: true,
+ }
+ }
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
+ // Reset the position, as it is only for the special case of newly joined rooms
+ if delta.NewlyJoined {
+ pos = newRange.From
+ }
switch {
case r.Backwards && pos < newPos:
fallthrough
@@ -222,37 +238,6 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
- // If we joined a new room in this sync, make sure we add enough information about it.
- // This does an "initial sync" for the newly joined rooms
- newlyJoinedRooms := joinedRooms(req.Response, req.Device.UserID)
- if len(newlyJoinedRooms) > 0 {
- // remove already added rooms, as we're doing an "initial sync"
- for _, x := range newlyJoinedRooms {
- delete(req.Response.Rooms.Join, x)
- }
- r = types.Range{
- From: to,
- To: 0,
- Backwards: true,
- }
- // We only care about the newly joined rooms, so update the stateFilter to reflect that
- stateFilter.Rooms = &newlyJoinedRooms
- if stateDeltas, _, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
- req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
- return newPos
- }
- for _, delta := range stateDeltas {
- // Ignore deltas for rooms we didn't newly join
- if _, ok := req.Response.Rooms.Join[delta.RoomID]; ok {
- continue
- }
- if _, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
- req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
- return newPos
- }
- }
- }
-
return newPos
}
@@ -340,12 +325,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}
- if len(events) > 0 {
- updateLatestPosition(events[len(events)-1].EventID())
- }
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
+ if len(events) > 0 {
+ updateLatestPosition(events[len(events)-1].EventID())
+ }
switch delta.Membership {
case gomatrixserverlib.Join:
@@ -418,6 +403,8 @@ func applyHistoryVisibilityFilter(
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
+ "before": len(recentEvents),
+ "after": len(events),
}).Debug("applied history visibility (sync)")
return events, nil
}
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
index 9d4740e9..268ed70c 100644
--- a/syncapi/sync/request.go
+++ b/syncapi/sync/request.go
@@ -23,12 +23,13 @@ import (
"strconv"
"time"
- "github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/types"
- userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
)
const defaultSyncTimeout = time.Duration(0)
@@ -46,15 +47,9 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
return nil, err
}
}
- // TODO: read from stored filters too
+
+ // Create a default filter and apply a stored filter on top of it (if specified)
filter := gomatrixserverlib.DefaultFilter()
- if since.IsEmpty() {
- // Send as much account data down for complete syncs as possible
- // by default, otherwise clients do weird things while waiting
- // for the rest of the data to trickle down.
- filter.AccountData.Limit = math.MaxInt32
- filter.Room.AccountData.Limit = math.MaxInt32
- }
filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" {
if filterQuery[0] == '{' {
@@ -76,6 +71,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
}
}
+ // A loaded filter might have overwritten these values,
+ // so set them after loading the filter.
+ if since.IsEmpty() {
+ // Send as much account data down for complete syncs as possible
+ // by default, otherwise clients do weird things while waiting
+ // for the rest of the data to trickle down.
+ filter.AccountData.Limit = math.MaxInt32
+ filter.Room.AccountData.Limit = math.MaxInt32
+ filter.Room.State.Limit = math.MaxInt32
+ }
+
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID,
"device_id": device.ID,
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index d908a962..c2c9616e 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -298,8 +298,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
- syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
+ syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync after wake-up")
}
} else {
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 76d51c86..c81256aa 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -195,7 +195,7 @@ func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
}
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
- t.SkipNow() // Temporary?
+ t.Skip("Skipped, possibly fixed")
user := test.NewUser(t)
room := test.NewRoom(t, user)
alice := userapi.Device{
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 39b085d9..d75d53ca 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -37,6 +37,7 @@ var (
type StateDelta struct {
RoomID string
StateEvents []*gomatrixserverlib.HeaderedEvent
+ NewlyJoined bool
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.