aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-09-04 14:25:01 +0100
committerGitHub <noreply@github.com>2020-09-04 14:25:01 +0100
commitca8dcf46b746686e213b184c3ae42ba0be17b46b (patch)
tree25a1708a15e8ec0cf2ab6d0a574411ce8e31cc87 /syncapi
parent81688d6bde5e544d11691e5b137eb444a35c9d32 (diff)
Remove QuerySharedUsers from current state server (#1396)
* Remove QuerySharedUsers from current state server * Bugfixes
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/keychange.go20
-rw-r--r--syncapi/internal/keychange.go22
-rw-r--r--syncapi/internal/keychange_test.go73
-rw-r--r--syncapi/sync/requestpool.go8
-rw-r--r--syncapi/syncapi.go4
5 files changed, 85 insertions, 42 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 93fa822d..33797378 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,6 +23,7 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
@@ -36,7 +37,8 @@ type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
- currentStateAPI currentstateAPI.CurrentStateInternalAPI
+ rsAPI roomserverAPI.RoomserverInternalAPI
+ stateAPI currentstateAPI.CurrentStateInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
@@ -51,7 +53,8 @@ func NewOutputKeyChangeEventConsumer(
kafkaConsumer sarama.Consumer,
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
- currentStateAPI currentstateAPI.CurrentStateInternalAPI,
+ rsAPI roomserverAPI.RoomserverInternalAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
) *OutputKeyChangeEventConsumer {
@@ -67,7 +70,8 @@ func NewOutputKeyChangeEventConsumer(
db: store,
serverName: serverName,
keyAPI: keyAPI,
- currentStateAPI: currentStateAPI,
+ rsAPI: rsAPI,
+ stateAPI: stateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: n,
@@ -105,8 +109,8 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// work out who we need to notify about the new key
- var queryRes currentstateAPI.QuerySharedUsersResponse
- err := s.currentStateAPI.QuerySharedUsers(context.Background(), &currentstateAPI.QuerySharedUsersRequest{
+ var queryRes roomserverAPI.QuerySharedUsersResponse
+ err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
UserID: output.UserID,
}, &queryRes)
if err != nil {
@@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
- syncinternal.DeviceListLogName: &types.LogPosition{
+ syncinternal.DeviceListLogName: {
Offset: msg.Offset,
Partition: msg.Partition,
},
@@ -129,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
- changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
+ changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@@ -142,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
- _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
+ _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 7d127aa8..f2f50aef 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -22,6 +22,7 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -48,7 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// be already filled in with join/leave information.
// nolint:gocyclo
func DeviceListCatchup(
- ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
@@ -56,7 +58,7 @@ func DeviceListCatchup(
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
- changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
+ changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
}
@@ -97,7 +99,7 @@ func DeviceListCatchup(
}
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
var sharedUsersMap map[string]int
- sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs)
+ sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs)
util.GetLogger(ctx).Debugf(
"QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
@@ -142,7 +144,7 @@ func DeviceListCatchup(
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers(
- ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
+ ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
@@ -151,8 +153,8 @@ func TrackChangedUsers(
// - Get users in newly left room. - QueryCurrentState
// - Loop set of users and decrement by 1 for each user in newly left room.
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
- var queryRes currentstateAPI.QuerySharedUsersResponse
- err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ var queryRes roomserverAPI.QuerySharedUsersResponse
+ err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
UserID: userID,
IncludeRoomIDs: newlyLeftRooms,
}, &queryRes)
@@ -193,7 +195,7 @@ func TrackChangedUsers(
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
// - If yes: then they already shared a room in common, do nothing.
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
- err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
UserID: userID,
ExcludeRoomIDs: newlyJoinedRooms,
}, &queryRes)
@@ -228,11 +230,11 @@ func TrackChangedUsers(
}
func filterSharedUsers(
- ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string,
+ ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string,
) (map[string]int, []string) {
var result []string
- var sharedUsersRes currentstateAPI.QuerySharedUsersResponse
- err := stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ var sharedUsersRes roomserverAPI.QuerySharedUsersResponse
+ err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
UserID: userID,
}, &sharedUsersRes)
if err != nil {
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index 03ec4e96..ee6ca1f4 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -7,8 +7,9 @@ import (
"testing"
"github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/currentstateserver/api"
+ stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -49,17 +50,18 @@ func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.Inpu
}
-type mockCurrentStateAPI struct {
+type mockRoomserverAPI struct {
+ api.RoomserverInternalAPITrace
roomIDToJoinedMembers map[string][]string
}
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
-func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
+func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
return nil
}
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
-func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
+func (s *mockRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" {
for _, roomID := range req.RoomIDs {
@@ -76,7 +78,7 @@ func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *ap
}
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
-func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
+func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
roomsToQuery := req.IncludeRoomIDs
for roomID, members := range s.roomIDToJoinedMembers {
exclude := false
@@ -106,6 +108,30 @@ func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.Que
return nil
}
+type mockStateAPI struct {
+ rsAPI *mockRoomserverAPI
+}
+
+// QueryRoomsForUser retrieves a list of room IDs matching the given query.
+func (s *mockStateAPI) QueryRoomsForUser(ctx context.Context, req *stateapi.QueryRoomsForUserRequest, res *stateapi.QueryRoomsForUserResponse) error {
+ return nil
+}
+
+// QueryBulkStateContent does a bulk query for state event content in the given rooms.
+func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
+ var res2 api.QueryBulkStateContentResponse
+ err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
+ RoomIDs: req.RoomIDs,
+ AllowWildcards: req.AllowWildcards,
+ StateTuples: req.StateTuples,
+ }, &res2)
+ if err != nil {
+ return err
+ }
+ res.Rooms = res2.Rooms
+ return nil
+}
+
type wantCatchup struct {
hasNew bool
changed []string
@@ -173,12 +199,13 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -195,12 +222,13 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -217,12 +245,13 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -238,12 +267,13 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -297,11 +327,12 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -322,13 +353,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -407,12 +439,15 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ rsAPI := &mockRoomserverAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken, newestToken)
+ }
+ hasNew, err := DeviceListCatchup(
+ context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken,
+ )
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 357df240..2859da71 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -40,15 +41,16 @@ type RequestPool struct {
userAPI userapi.UserInternalAPI
notifier *Notifier
keyAPI keyapi.KeyInternalAPI
+ rsAPI roomserverAPI.RoomserverInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
- stateAPI currentstateAPI.CurrentStateInternalAPI,
+ rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
) *RequestPool {
- return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
+ return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI}
}
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@@ -265,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
- _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
+ _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 0f4ea828..634bd1ee 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -62,11 +62,11 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier")
}
- requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
+ requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
- consumer, notifier, keyAPI, currentStateAPI, syncDB,
+ consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")