aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-30 18:00:56 +0100
committerGitHub <noreply@github.com>2020-07-30 18:00:56 +0100
commita7e67e65a8662387f1a5ba6860698743f9dbd60f (patch)
tree90714c83c20fee10ee3c758f3ba00b7f9eee6d1c /syncapi
parent292a9ddd82a7cfc64ed43b70454040fb009601a7 (diff)
Notify clients when devices are deleted (#1233)
* Recheck device lists when join/leave events come in * Add PerformDeviceDeletion * Notify clients when devices are deleted * Unbreak things * Remove debug logging
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go23
-rw-r--r--syncapi/internal/keychange_test.go3
-rw-r--r--syncapi/syncapi.go18
3 files changed, 35 insertions, 9 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index da4a5366..f8cdcd5c 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct {
rsConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
+ keyChanges *OutputKeyChangeEventConsumer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer(
n *sync.Notifier,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
+ keyChanges *OutputKeyChangeEventConsumer,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
@@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer(
db: store,
notifier: n,
rsAPI: rsAPI,
+ keyChanges: keyChanges,
}
consumer.ProcessMessage = s.onMessage
@@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+ s.notifyKeyChanges(&ev)
+
return nil
}
+func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) {
+ if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil {
+ return
+ }
+ membership, err := ev.Membership()
+ if err != nil {
+ return
+ }
+ switch membership {
+ case gomatrixserverlib.Join:
+ s.keyChanges.OnJoinEvent(ev)
+ case gomatrixserverlib.Ban:
+ fallthrough
+ case gomatrixserverlib.Leave:
+ s.keyChanges.OnLeaveEvent(ev)
+ }
+}
+
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index 3f18696c..2c3d154d 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -10,6 +10,7 @@ import (
"github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -29,6 +30,8 @@ type mockKeyAPI struct{}
func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) {
}
+func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {}
+
// PerformClaimKeys claims one-time keys for use in pre-key messages
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 754cd502..5198d59b 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -64,8 +64,16 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
+ keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
+ cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ consumer, notifier, keyAPI, currentStateAPI, syncDB,
+ )
+ if err = keyChangeConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start key change consumer")
+ }
+
roomConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, notifier, syncDB, rsAPI,
+ cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -92,13 +100,5 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
- keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
- consumer, notifier, keyAPI, currentStateAPI, syncDB,
- )
- if err = keyChangeConsumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start key change consumer")
- }
-
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}