diff options
author | Kegsay <kegan@matrix.org> | 2020-09-04 14:25:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-04 14:25:01 +0100 |
commit | ca8dcf46b746686e213b184c3ae42ba0be17b46b (patch) | |
tree | 25a1708a15e8ec0cf2ab6d0a574411ce8e31cc87 /syncapi | |
parent | 81688d6bde5e544d11691e5b137eb444a35c9d32 (diff) |
Remove QuerySharedUsers from current state server (#1396)
* Remove QuerySharedUsers from current state server
* Bugfixes
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/keychange.go | 20 | ||||
-rw-r--r-- | syncapi/internal/keychange.go | 22 | ||||
-rw-r--r-- | syncapi/internal/keychange_test.go | 73 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 8 | ||||
-rw-r--r-- | syncapi/syncapi.go | 4 |
5 files changed, 85 insertions, 42 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 93fa822d..33797378 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,6 +23,7 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" syncapi "github.com/matrix-org/dendrite/syncapi/sync" @@ -36,7 +37,8 @@ type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name - currentStateAPI currentstateAPI.CurrentStateInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex @@ -51,7 +53,8 @@ func NewOutputKeyChangeEventConsumer( kafkaConsumer sarama.Consumer, n *syncapi.Notifier, keyAPI api.KeyInternalAPI, - currentStateAPI currentstateAPI.CurrentStateInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, ) *OutputKeyChangeEventConsumer { @@ -67,7 +70,8 @@ func NewOutputKeyChangeEventConsumer( db: store, serverName: serverName, keyAPI: keyAPI, - currentStateAPI: currentStateAPI, + rsAPI: rsAPI, + stateAPI: stateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, notifier: n, @@ -105,8 +109,8 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // work out who we need to notify about the new key - var queryRes currentstateAPI.QuerySharedUsersResponse - err := s.currentStateAPI.QuerySharedUsers(context.Background(), ¤tstateAPI.QuerySharedUsersRequest{ + var queryRes roomserverAPI.QuerySharedUsersResponse + err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ UserID: output.UserID, }, &queryRes) if err != nil { @@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ - syncinternal.DeviceListLogName: &types.LogPosition{ + syncinternal.DeviceListLogName: { Offset: msg.Offset, Partition: msg.Partition, }, @@ -129,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -142,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 7d127aa8..f2f50aef 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -22,6 +22,7 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -48,7 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // be already filled in with join/leave information. // nolint:gocyclo func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, ) (hasNew bool, err error) { @@ -56,7 +58,7 @@ func DeviceListCatchup( newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { return false, err } @@ -97,7 +99,7 @@ func DeviceListCatchup( } // QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user. var sharedUsersMap map[string]int - sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs) + sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) util.GetLogger(ctx).Debugf( "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, @@ -142,7 +144,7 @@ func DeviceListCatchup( // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // nolint:gocyclo func TrackChangedUsers( - ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ) (changed, left []string, err error) { // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. @@ -151,8 +153,8 @@ func TrackChangedUsers( // - Get users in newly left room. - QueryCurrentState // - Loop set of users and decrement by 1 for each user in newly left room. // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + var queryRes roomserverAPI.QuerySharedUsersResponse + err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, IncludeRoomIDs: newlyLeftRooms, }, &queryRes) @@ -193,7 +195,7 @@ func TrackChangedUsers( // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? // - If yes: then they already shared a room in common, do nothing. // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, ExcludeRoomIDs: newlyJoinedRooms, }, &queryRes) @@ -228,11 +230,11 @@ func TrackChangedUsers( } func filterSharedUsers( - ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string, + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string, ) (map[string]int, []string) { var result []string - var sharedUsersRes currentstateAPI.QuerySharedUsersResponse - err := stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + var sharedUsersRes roomserverAPI.QuerySharedUsersResponse + err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, }, &sharedUsersRes) if err != nil { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 03ec4e96..ee6ca1f4 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -7,8 +7,9 @@ import ( "testing" "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/currentstateserver/api" + stateapi "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -49,17 +50,18 @@ func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.Inpu } -type mockCurrentStateAPI struct { +type mockRoomserverAPI struct { + api.RoomserverInternalAPITrace roomIDToJoinedMembers map[string][]string } // QueryRoomsForUser retrieves a list of room IDs matching the given query. -func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { +func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { return nil } // QueryBulkStateContent does a bulk query for state event content in the given rooms. -func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { +func (s *mockRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string) if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" { for _, roomID := range req.RoomIDs { @@ -76,7 +78,7 @@ func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *ap } // QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. -func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { +func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { roomsToQuery := req.IncludeRoomIDs for roomID, members := range s.roomIDToJoinedMembers { exclude := false @@ -106,6 +108,30 @@ func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.Que return nil } +type mockStateAPI struct { + rsAPI *mockRoomserverAPI +} + +// QueryRoomsForUser retrieves a list of room IDs matching the given query. +func (s *mockStateAPI) QueryRoomsForUser(ctx context.Context, req *stateapi.QueryRoomsForUserRequest, res *stateapi.QueryRoomsForUserResponse) error { + return nil +} + +// QueryBulkStateContent does a bulk query for state event content in the given rooms. +func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error { + var res2 api.QueryBulkStateContentResponse + err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{ + RoomIDs: req.RoomIDs, + AllowWildcards: req.AllowWildcards, + StateTuples: req.StateTuples, + }, &res2) + if err != nil { + return err + } + res.Rooms = res2.Rooms + return nil +} + type wantCatchup struct { hasNew bool changed []string @@ -173,12 +199,13 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -195,12 +222,13 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -217,12 +245,13 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -238,12 +267,13 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -297,11 +327,12 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -322,13 +353,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -407,12 +439,15 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {newShareUser, newShareUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup( + context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken, + ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 357df240..2859da71 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" @@ -40,15 +41,16 @@ type RequestPool struct { userAPI userapi.UserInternalAPI notifier *Notifier keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI stateAPI currentstateAPI.CurrentStateInternalAPI } // NewRequestPool makes a new RequestPool func NewRequestPool( db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, ) *RequestPool { - return &RequestPool{db, userAPI, n, keyAPI, stateAPI} + return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -265,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea func (rp *RequestPool) appendDeviceLists( data *types.Response, userID string, since, to types.StreamingToken, ) (*types.Response, error) { - _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to) + _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to) if err != nil { return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 0f4ea828..634bd1ee 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -62,11 +62,11 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, notifier, keyAPI, currentStateAPI, syncDB, + consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") |