diff options
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | syncapi/storage/interface.go | 3 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 34 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 57 | ||||
-rw-r--r-- | syncapi/sync/request.go | 28 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 2 | ||||
-rw-r--r-- | syncapi/syncapi_test.go | 2 | ||||
-rw-r--r-- | syncapi/types/types.go | 1 |
9 files changed, 64 insertions, 69 deletions
@@ -21,7 +21,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220815094957-74b7ff4ae09c + github.com/matrix-org/gomatrixserverlib v0.0.0-20220824082345-662dca17bf94 github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 @@ -343,8 +343,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220815094957-74b7ff4ae09c h1:GhKmb8s9iXA9qsFD1SbiRo6Ee7cnbfcgJQ/iy43wczM= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220815094957-74b7ff4ae09c/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220824082345-662dca17bf94 h1:zoTv/qxg7C/O995JBPvp+Z8KMR69HhB+M+P22A8Hmm0= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220824082345-662dca17bf94/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 h1:ed8yvWhTLk7+sNeK/eOZRTvESFTOHDRevoRoyeqPtvY= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9/go.mod h1:P4MqPf+u83OPulPJ+XTbSDbbWrdFYNY4LZ/B1PIduFE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 43a75da9..0c8ba4e3 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -19,10 +19,11 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" ) type Database interface { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index a46e5525..b06d2c6a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -20,15 +20,18 @@ import ( "encoding/json" "fmt" + "github.com/tidwall/gjson" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) // Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite @@ -683,7 +686,7 @@ func (d *Database) GetStateDeltas( ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, -) ([]types.StateDelta, []string, error) { +) (deltas []types.StateDelta, joinedRoomsIDs []string, err error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -718,8 +721,6 @@ func (d *Database) GetStateDeltas( } } - var deltas []types.StateDelta - // get all the state events ever (i.e. for all available rooms) between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter, allRoomIDs) if err != nil { @@ -767,15 +768,11 @@ func (d *Database) GetStateDeltas( } // handle newly joined rooms and non-joined rooms + newlyJoinedRooms := make(map[string]bool, len(state)) for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if membership := getMembershipFromEvent(ev.Event, userID); membership != "" { - if membership == gomatrixserverlib.Join { + if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" { + if membership == gomatrixserverlib.Join && prevMembership != membership { // send full room state down instead of a delta var s []types.StreamEvent s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) @@ -786,6 +783,7 @@ func (d *Database) GetStateDeltas( return nil, nil, err } state[roomID] = s + newlyJoinedRooms[roomID] = true continue // we'll add this room in when we do joined rooms } @@ -806,6 +804,7 @@ func (d *Database) GetStateDeltas( Membership: gomatrixserverlib.Join, StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), RoomID: joinedRoomID, + NewlyJoined: newlyJoinedRooms[joinedRoomID], }) } @@ -892,7 +891,7 @@ func (d *Database) GetStateDeltasForFullStateSync( for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { - if membership := getMembershipFromEvent(ev.Event, userID); membership != "" { + if membership, _ := getMembershipFromEvent(ev.Event, userID); membership != "" { if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. deltas[roomID] = types.StateDelta{ Membership: membership, @@ -1003,15 +1002,16 @@ func (d *Database) CleanSendToDeviceUpdates( // getMembershipFromEvent returns the value of content.membership iff the event is a state event // with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. -func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) (string, string) { if ev.Type() != "m.room.member" || !ev.StateKeyEquals(userID) { - return "" + return "", "" } membership, err := ev.Membership() if err != nil { - return "" + return "", "" } - return membership + prevMembership := gjson.GetBytes(ev.Unsigned(), "prev_content.membership").Str + return membership, prevMembership } // StoreReceipt stores user receipts diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 2818aad8..fa4c722c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -209,11 +209,27 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { + newRange := r + // If this room was joined in this sync, try to fetch + // as much timeline events as allowed by the filter. + if delta.NewlyJoined { + // Reverse the range, so we get the most recent first. + // This will be limited by the eventFilter. + newRange = types.Range{ + From: r.To, + To: 0, + Backwards: true, + } + } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } + // Reset the position, as it is only for the special case of newly joined rooms + if delta.NewlyJoined { + pos = newRange.From + } switch { case r.Backwards && pos < newPos: fallthrough @@ -222,37 +238,6 @@ func (p *PDUStreamProvider) IncrementalSync( } } - // If we joined a new room in this sync, make sure we add enough information about it. - // This does an "initial sync" for the newly joined rooms - newlyJoinedRooms := joinedRooms(req.Response, req.Device.UserID) - if len(newlyJoinedRooms) > 0 { - // remove already added rooms, as we're doing an "initial sync" - for _, x := range newlyJoinedRooms { - delete(req.Response.Rooms.Join, x) - } - r = types.Range{ - From: to, - To: 0, - Backwards: true, - } - // We only care about the newly joined rooms, so update the stateFilter to reflect that - stateFilter.Rooms = &newlyJoinedRooms - if stateDeltas, _, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { - req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") - return newPos - } - for _, delta := range stateDeltas { - // Ignore deltas for rooms we didn't newly join - if _, ok := req.Response.Rooms.Join[delta.RoomID]; ok { - continue - } - if _, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { - req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos - } - } - } - return newPos } @@ -340,12 +325,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( logrus.WithError(err).Error("unable to apply history visibility filter") } - if len(events) > 0 { - updateLatestPosition(events[len(events)-1].EventID()) - } if len(delta.StateEvents) > 0 { updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } + if len(events) > 0 { + updateLatestPosition(events[len(events)-1].EventID()) + } switch delta.Membership { case gomatrixserverlib.Join: @@ -418,6 +403,8 @@ func applyHistoryVisibilityFilter( logrus.WithFields(logrus.Fields{ "duration": time.Since(startTime), "room_id": roomID, + "before": len(recentEvents), + "after": len(events), }).Debug("applied history visibility (sync)") return events, nil } diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 9d4740e9..268ed70c 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -23,12 +23,13 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) const defaultSyncTimeout = time.Duration(0) @@ -46,15 +47,9 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat return nil, err } } - // TODO: read from stored filters too + + // Create a default filter and apply a stored filter on top of it (if specified) filter := gomatrixserverlib.DefaultFilter() - if since.IsEmpty() { - // Send as much account data down for complete syncs as possible - // by default, otherwise clients do weird things while waiting - // for the rest of the data to trickle down. - filter.AccountData.Limit = math.MaxInt32 - filter.Room.AccountData.Limit = math.MaxInt32 - } filterQuery := req.URL.Query().Get("filter") if filterQuery != "" { if filterQuery[0] == '{' { @@ -76,6 +71,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } } + // A loaded filter might have overwritten these values, + // so set them after loading the filter. + if since.IsEmpty() { + // Send as much account data down for complete syncs as possible + // by default, otherwise clients do weird things while waiting + // for the rest of the data to trickle down. + filter.AccountData.Limit = math.MaxInt32 + filter.Room.AccountData.Limit = math.MaxInt32 + filter.Room.State.Limit = math.MaxInt32 + } + logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ "user_id": device.UserID, "device_id": device.ID, diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index d908a962..c2c9616e 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -298,8 +298,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. return giveup() case <-userStreamListener.GetNotifyChannel(syncReq.Since): - syncReq.Log.Debugln("Responding to sync after wake-up") currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) + syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync after wake-up") } } else { syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately") diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 76d51c86..c81256aa 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -195,7 +195,7 @@ func TestSyncAPICreateRoomSyncEarly(t *testing.T) { } func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { - t.SkipNow() // Temporary? + t.Skip("Skipped, possibly fixed") user := test.NewUser(t) room := test.NewRoom(t, user) alice := userapi.Device{ diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 39b085d9..d75d53ca 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -37,6 +37,7 @@ var ( type StateDelta struct { RoomID string StateEvents []*gomatrixserverlib.HeaderedEvent + NewlyJoined bool Membership string // The PDU stream position of the latest membership event for this user, if applicable. // Can be 0 if there is no membership event in this delta. |