aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/consumers.go14
-rw-r--r--syncapi/consumers/keychange.go221
-rw-r--r--syncapi/consumers/keychange_test.go400
-rw-r--r--syncapi/types/types.go4
4 files changed, 621 insertions, 18 deletions
diff --git a/internal/consumers.go b/internal/consumers.go
index d7917f23..c000c171 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown")
// Returns nil once all the goroutines are started.
// Returns an error if it can't start consuming for any of the partitions.
func (c *ContinualConsumer) Start() error {
+ _, err := c.StartOffsets()
+ return err
+}
+
+// StartOffsets is the same as Start but returns the loaded offsets as well.
+func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
offsets := map[int32]int64{}
partitions, err := c.Consumer.Partitions(c.Topic)
if err != nil {
- return err
+ return nil, err
}
for _, partition := range partitions {
// Default all the offsets to the beginning of the stream.
@@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error {
storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
if err != nil {
- return err
+ return nil, err
}
for _, offset := range storedOffsets {
// We've already processed events from this partition so advance the offset to where we got to.
@@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error {
for _, p := range partitionConsumers {
p.Close() // nolint: errcheck
}
- return err
+ return nil, err
}
partitionConsumers = append(partitionConsumers, pc)
}
@@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error {
go c.consumePartition(pc)
}
- return nil
+ return storedOffsets, nil
}
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 23961452..4a1c7309 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -17,13 +17,14 @@ package consumers
import (
"context"
"encoding/json"
+ "sync"
"github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
- "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@@ -35,28 +36,33 @@ type OutputKeyChangeEventConsumer struct {
serverName gomatrixserverlib.ServerName // our server name
currentStateAPI currentstateAPI.CurrentStateInternalAPI
// keyAPI api.KeyInternalAPI
+ partitionToOffset map[int32]int64
+ partitionToOffsetMu sync.Mutex
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
- cfg *config.Dendrite,
+ serverName gomatrixserverlib.ServerName,
+ topic string,
kafkaConsumer sarama.Consumer,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ Topic: topic,
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputKeyChangeEventConsumer{
- keyChangeConsumer: &consumer,
- db: store,
- serverName: cfg.Matrix.ServerName,
- currentStateAPI: currentStateAPI,
+ keyChangeConsumer: &consumer,
+ db: store,
+ serverName: serverName,
+ currentStateAPI: currentStateAPI,
+ partitionToOffset: make(map[int32]int64),
+ partitionToOffsetMu: sync.Mutex{},
}
consumer.ProcessMessage = s.onMessage
@@ -66,10 +72,25 @@ func NewOutputKeyChangeEventConsumer(
// Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error {
- return s.keyChangeConsumer.Start()
+ offsets, err := s.keyChangeConsumer.StartOffsets()
+ s.partitionToOffsetMu.Lock()
+ for _, o := range offsets {
+ s.partitionToOffset[o.Partition] = o.Offset
+ }
+ s.partitionToOffsetMu.Unlock()
+ return err
+}
+
+func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage) {
+ s.partitionToOffsetMu.Lock()
+ defer s.partitionToOffsetMu.Unlock()
+ s.partitionToOffset[msg.Partition] = msg.Offset
}
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ defer func() {
+ s.updateOffset(msg)
+ }()
var output api.DeviceKeys
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
@@ -78,18 +99,190 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// work out who we need to notify about the new key
var queryRes currentstateAPI.QuerySharedUsersResponse
- err := s.currentStateAPI.QuerySharedUsers(context.Background(), &currentstateAPI.QuerySharedUsersRequest{}, &queryRes)
+ err := s.currentStateAPI.QuerySharedUsers(context.Background(), &currentstateAPI.QuerySharedUsersRequest{
+ UserID: output.UserID,
+ }, &queryRes)
if err != nil {
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
return err
}
- // TODO: notify users by waking up streams
+ // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
return nil
}
-// Catchup returns a list of user IDs of users who have changed their device keys between the partition|offset given and now.
-// Returns the new offset for this partition.
-func (s *OutputKeyChangeEventConsumer) Catchup(parition int32, offset int64) (userIDs []string, newOffset int, err error) {
- //return s.keyAPI.QueryKeyChangeCatchup(ctx, partition, offset)
+// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
+// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
+// be already filled in with join/leave information.
+func (s *OutputKeyChangeEventConsumer) Catchup(
+ ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
+) (hasNew bool, err error) {
+ // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
+ newlyJoinedRooms := joinedRooms(res, userID)
+ newlyLeftRooms := leftRooms(res)
+ if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
+ changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
+ if err != nil {
+ return false, err
+ }
+ res.DeviceLists.Changed = changed
+ res.DeviceLists.Left = left
+ hasNew = len(changed) > 0 || len(left) > 0
+ }
+
+ // TODO: now also track users who we already share rooms with but who have updated their devices between the two tokens
return
}
+
+func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
+ // work out who we are now sharing rooms with which we previously were not and notify them about the joining
+ // users keys:
+ changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil)
+ if err != nil {
+ log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
+ return
+ }
+ // TODO: f.e changed, wake up stream
+ for _, userID := range changed {
+ log.Infof("OnJoinEvent:Notify %s that %s should have device lists tracked", userID, *ev.StateKey())
+ }
+}
+
+func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
+ // work out who we are no longer sharing any rooms with and notify them about the leaving user
+ _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()})
+ if err != nil {
+ log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
+ return
+ }
+ // TODO: f.e left, wake up stream
+ for _, userID := range left {
+ log.Infof("OnLeaveEvent:Notify %s that %s should no longer track device lists", userID, *ev.StateKey())
+ }
+
+}
+
+// nolint:gocyclo
+func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
+ ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
+) (changed, left []string, err error) {
+ // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
+
+ // Leave algorithm:
+ // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
+ // - Get users in newly left room. - QueryCurrentState
+ // - Loop set of users and decrement by 1 for each user in newly left room.
+ // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
+ var queryRes currentstateAPI.QuerySharedUsersResponse
+ err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ IncludeRoomIDs: newlyLeftRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ var stateRes currentstateAPI.QueryBulkStateContentResponse
+ err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyLeftRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ queryRes.UserIDsToCount[tuple.StateKey]--
+ }
+ }
+ for userID, count := range queryRes.UserIDsToCount {
+ if count <= 0 {
+ left = append(left, userID) // left is returned
+ }
+ }
+
+ // Join algorithm:
+ // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
+ // - Get users in newly joined room - QueryCurrentState
+ // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
+ // - If yes: then they already shared a room in common, do nothing.
+ // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
+ err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ ExcludeRoomIDs: newlyJoinedRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, left, err
+ }
+ err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyJoinedRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, left, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ // new user who we weren't previously sharing rooms with
+ if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
+ changed = append(changed, tuple.StateKey) // changed is returned
+ }
+ }
+ }
+ return changed, left, nil
+}
+
+func joinedRooms(res *types.Response, userID string) []string {
+ var roomIDs []string
+ for roomID, join := range res.Rooms.Join {
+ // we would expect to see our join event somewhere if we newly joined the room.
+ // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
+ newlyJoined := membershipEventPresent(join.State.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ continue
+ }
+ newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ }
+ }
+ return roomIDs
+}
+
+func leftRooms(res *types.Response) []string {
+ roomIDs := make([]string, len(res.Rooms.Leave))
+ i := 0
+ for roomID := range res.Rooms.Leave {
+ roomIDs[i] = roomID
+ i++
+ }
+ return roomIDs
+}
+
+func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
+ for _, ev := range events {
+ // it's enough to know that we have our member event here, don't need to check membership content
+ // as it's implied by being in the respective section of the sync response.
+ if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
+ return true
+ }
+ }
+ return false
+}
diff --git a/syncapi/consumers/keychange_test.go b/syncapi/consumers/keychange_test.go
new file mode 100644
index 00000000..9e7ede1f
--- /dev/null
+++ b/syncapi/consumers/keychange_test.go
@@ -0,0 +1,400 @@
+package consumers
+
+import (
+ "context"
+ "reflect"
+ "sort"
+ "testing"
+
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+var (
+ syncingUser = "@alice:localhost"
+)
+
+type mockCurrentStateAPI struct {
+ roomIDToJoinedMembers map[string][]string
+}
+
+func (s *mockCurrentStateAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
+ return nil
+}
+
+// QueryRoomsForUser retrieves a list of room IDs matching the given query.
+func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
+ return nil
+}
+
+// QueryBulkStateContent does a bulk query for state event content in the given rooms.
+func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
+ res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
+ if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" {
+ for _, roomID := range req.RoomIDs {
+ res.Rooms[roomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
+ for _, userID := range s.roomIDToJoinedMembers[roomID] {
+ res.Rooms[roomID][gomatrixserverlib.StateKeyTuple{
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: userID,
+ }] = "join"
+ }
+ }
+ }
+ return nil
+}
+
+// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
+func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
+ roomsToQuery := req.IncludeRoomIDs
+ for roomID, members := range s.roomIDToJoinedMembers {
+ exclude := false
+ for _, excludeRoomID := range req.ExcludeRoomIDs {
+ if roomID == excludeRoomID {
+ exclude = true
+ break
+ }
+ }
+ if exclude {
+ continue
+ }
+ for _, userID := range members {
+ if userID == req.UserID {
+ roomsToQuery = append(roomsToQuery, roomID)
+ break
+ }
+ }
+ }
+
+ res.UserIDsToCount = make(map[string]int)
+ for _, roomID := range roomsToQuery {
+ for _, userID := range s.roomIDToJoinedMembers[roomID] {
+ res.UserIDsToCount[userID]++
+ }
+ }
+ return nil
+}
+
+type wantCatchup struct {
+ hasNew bool
+ changed []string
+ left []string
+}
+
+func assertCatchup(t *testing.T, hasNew bool, syncResponse *types.Response, want wantCatchup) {
+ if hasNew != want.hasNew {
+ t.Errorf("got hasNew=%v want %v", hasNew, want.hasNew)
+ }
+ sort.Strings(syncResponse.DeviceLists.Left)
+ if !reflect.DeepEqual(syncResponse.DeviceLists.Left, want.left) {
+ t.Errorf("device_lists.left got %v want %v", syncResponse.DeviceLists.Left, want.left)
+ }
+ sort.Strings(syncResponse.DeviceLists.Changed)
+ if !reflect.DeepEqual(syncResponse.DeviceLists.Changed, want.changed) {
+ t.Errorf("device_lists.changed got %v want %v", syncResponse.DeviceLists.Changed, want.changed)
+ }
+}
+
+func joinResponseWithRooms(syncResponse *types.Response, userID string, roomIDs []string) *types.Response {
+ for _, roomID := range roomIDs {
+ roomEvents := []gomatrixserverlib.ClientEvent{
+ {
+ Type: "m.room.member",
+ StateKey: &userID,
+ EventID: "$something:here",
+ Sender: userID,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"join"}`),
+ },
+ }
+
+ jr := syncResponse.Rooms.Join[roomID]
+ jr.State.Events = roomEvents
+ syncResponse.Rooms.Join[roomID] = jr
+ }
+ return syncResponse
+}
+
+func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs []string) *types.Response {
+ for _, roomID := range roomIDs {
+ roomEvents := []gomatrixserverlib.ClientEvent{
+ {
+ Type: "m.room.member",
+ StateKey: &userID,
+ EventID: "$something:here",
+ Sender: userID,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"leave"}`),
+ },
+ }
+
+ lr := syncResponse.Rooms.Leave[roomID]
+ lr.Timeline.Events = roomEvents
+ syncResponse.Rooms.Leave[roomID] = lr
+ }
+ return syncResponse
+}
+
+// tests that joining a room which results in sharing a new user includes that user in `changed`
+func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
+ newShareUser := "@bill:localhost"
+ newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ newlyJoinedRoom: {syncingUser, newShareUser},
+ "!another:room": {syncingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: true,
+ changed: []string{newShareUser},
+ })
+}
+
+// tests that leaving a room which results in sharing no rooms with a user includes that user in `left`
+func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
+ removeUser := "@bill:localhost"
+ newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ newlyLeftRoom: {removeUser},
+ "!another:room": {syncingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: true,
+ left: []string{removeUser},
+ })
+}
+
+// tests that joining a room which doesn't result in sharing a new user results in no changes.
+func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
+ existingUser := "@bob:localhost"
+ newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ newlyJoinedRoom: {syncingUser, existingUser},
+ "!another:room": {syncingUser, existingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: false,
+ })
+}
+
+// tests that leaving a room which doesn't result in sharing no rooms with a user results in no changes.
+func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
+ existingUser := "@bob:localhost"
+ newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ newlyLeftRoom: {existingUser},
+ "!another:room": {syncingUser, existingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: false,
+ })
+}
+
+// tests that not joining any rooms (but having messages in the response) do not result in changes.
+func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
+ existingUser := "@bob1:localhost"
+ roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {syncingUser, existingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ empty := ""
+ roomStateEvents := []gomatrixserverlib.ClientEvent{
+ {
+ Type: "m.room.name",
+ StateKey: &empty,
+ EventID: "$something:here",
+ Sender: existingUser,
+ RoomID: roomID,
+ Content: []byte(`{"name":"The Room Name"}`),
+ },
+ }
+ roomTimelineEvents := []gomatrixserverlib.ClientEvent{
+ {
+ Type: "m.room.message",
+ EventID: "$something1:here",
+ Sender: existingUser,
+ RoomID: roomID,
+ Content: []byte(`{"body":"Message 1"}`),
+ },
+ {
+ Type: "m.room.message",
+ EventID: "$something2:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"body":"Message 2"}`),
+ },
+ {
+ Type: "m.room.message",
+ EventID: "$something3:here",
+ Sender: existingUser,
+ RoomID: roomID,
+ Content: []byte(`{"body":"Message 3"}`),
+ },
+ }
+
+ jr := syncResponse.Rooms.Join[roomID]
+ jr.State.Events = roomStateEvents
+ jr.Timeline.Events = roomTimelineEvents
+ syncResponse.Rooms.Join[roomID] = jr
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: false,
+ })
+}
+
+// tests that joining/leaving multiple rooms can result in both `changed` and `left` and they are not duplicated.
+func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
+ newShareUser := "@berta:localhost"
+ newShareUser2 := "@bobby:localhost"
+ newlyLeftUser := "@charlie:localhost"
+ newlyLeftUser2 := "@debra:localhost"
+ newlyJoinedRoom := "!join:bar"
+ newlyLeftRoom := "!left:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
+ newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
+ "!another:room": {syncingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: true,
+ changed: []string{newShareUser, newShareUser2},
+ left: []string{newlyLeftUser, newlyLeftUser2},
+ })
+}
+
+// tests that joining/leaving the SAME room puts users in `left` if the final state is leave.
+// NB: Consider the case:
+// - Alice and Bob are in a room.
+// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room.
+// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time,
+// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys
+// which are only relevant when actively sending events I think? And if Alice does need the keys she knows
+// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she
+// doesn't share any rooms with him.
+// Ergo, we put them in `left` as it is simpler.
+func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
+ newShareUser := "@berta:localhost"
+ newShareUser2 := "@bobby:localhost"
+ roomID := "!join:bar"
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {newShareUser, newShareUser2},
+ "!another:room": {syncingUser},
+ },
+ }, nil)
+ syncResponse := types.NewResponse()
+ roomEvents := []gomatrixserverlib.ClientEvent{
+ {
+ Type: "m.room.member",
+ StateKey: &syncingUser,
+ EventID: "$something:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"join"}`),
+ },
+ {
+ Type: "m.room.message",
+ EventID: "$something2:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"body":"now I leave you"}`),
+ },
+ {
+ Type: "m.room.member",
+ StateKey: &syncingUser,
+ EventID: "$something3:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"leave"}`),
+ },
+ {
+ Type: "m.room.member",
+ StateKey: &syncingUser,
+ EventID: "$something4:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"join"}`),
+ },
+ {
+ Type: "m.room.message",
+ EventID: "$something5:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"body":"now I am back, and I leave you for good"}`),
+ },
+ {
+ Type: "m.room.member",
+ StateKey: &syncingUser,
+ EventID: "$something6:here",
+ Sender: syncingUser,
+ RoomID: roomID,
+ Content: []byte(`{"membership":"leave"}`),
+ },
+ }
+
+ lr := syncResponse.Rooms.Leave[roomID]
+ lr.Timeline.Events = roomEvents
+ syncResponse.Rooms.Leave[roomID] = lr
+
+ hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ if err != nil {
+ t.Fatalf("Catchup returned an error: %s", err)
+ }
+ assertCatchup(t, hasNew, syncResponse, wantCatchup{
+ hasNew: true,
+ left: []string{newShareUser, newShareUser2},
+ })
+}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 019f2e69..7dc02281 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -302,6 +302,10 @@ type Response struct {
ToDevice struct {
Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
} `json:"to_device"`
+ DeviceLists struct {
+ Changed []string `json:"changed,omitempty"`
+ Left []string `json:"left,omitempty"`
+ } `json:"device_lists,omitempty"`
}
// NewResponse creates an empty response with initialised maps.