diff options
author | Kegsay <kegan@matrix.org> | 2020-07-30 18:00:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-30 18:00:56 +0100 |
commit | a7e67e65a8662387f1a5ba6860698743f9dbd60f (patch) | |
tree | 90714c83c20fee10ee3c758f3ba00b7f9eee6d1c /syncapi | |
parent | 292a9ddd82a7cfc64ed43b70454040fb009601a7 (diff) |
Notify clients when devices are deleted (#1233)
* Recheck device lists when join/leave events come in
* Add PerformDeviceDeletion
* Notify clients when devices are deleted
* Unbreak things
* Remove debug logging
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 23 | ||||
-rw-r--r-- | syncapi/internal/keychange_test.go | 3 | ||||
-rw-r--r-- | syncapi/syncapi.go | 18 |
3 files changed, 35 insertions, 9 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index da4a5366..f8cdcd5c 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct { rsConsumer *internal.ContinualConsumer db storage.Database notifier *sync.Notifier + keyChanges *OutputKeyChangeEventConsumer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer( n *sync.Notifier, store storage.Database, rsAPI api.RoomserverInternalAPI, + keyChanges *OutputKeyChangeEventConsumer, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ @@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer( db: store, notifier: n, rsAPI: rsAPI, + keyChanges: keyChanges, } consumer.ProcessMessage = s.onMessage @@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + s.notifyKeyChanges(&ev) + return nil } +func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) { + if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil { + return + } + membership, err := ev.Membership() + if err != nil { + return + } + switch membership { + case gomatrixserverlib.Join: + s.keyChanges.OnJoinEvent(ev) + case gomatrixserverlib.Ban: + fallthrough + case gomatrixserverlib.Leave: + s.keyChanges.OnLeaveEvent(ev) + } +} + func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 3f18696c..2c3d154d 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -29,6 +30,8 @@ type mockKeyAPI struct{} func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) { } +func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {} + // PerformClaimKeys claims one-time keys for use in pre-key messages func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) { } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 754cd502..5198d59b 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -64,8 +64,16 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, notifier, syncDB, rsAPI, + cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -92,13 +100,5 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), - consumer, notifier, keyAPI, currentStateAPI, syncDB, - ) - if err = keyChangeConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start key change consumer") - } - routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } |