diff options
author | Kegsay <kegan@matrix.org> | 2020-06-30 13:34:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-30 13:34:59 +0100 |
commit | 6f49758b90d655d9c2bb9170da2ea1d0a2bdd664 (patch) | |
tree | f434cc48f27c3915fffdb39c5b907fd021d4ff61 /clientapi | |
parent | ca5bbffd8d987b220c8f8eb888a2fc9b9cef104c (diff) |
Remove membership table from account DB (#1172)
* Remove membership table from account DB
And make code which needs that data use the currentstate server
* Unbreak tests; use a membership enum for space
Diffstat (limited to 'clientapi')
-rw-r--r-- | clientapi/clientapi.go | 14 | ||||
-rw-r--r-- | clientapi/consumers/roomserver.go | 92 | ||||
-rw-r--r-- | clientapi/routing/memberships.go | 20 | ||||
-rw-r--r-- | clientapi/routing/profile.go | 37 | ||||
-rw-r--r-- | clientapi/routing/routing.go | 10 | ||||
-rw-r--r-- | clientapi/routing/sendtyping.go | 36 |
6 files changed, 67 insertions, 142 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 174eb1bf..8ea84249 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -18,9 +18,9 @@ import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" - "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -30,14 +30,12 @@ import ( "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( router *mux.Router, cfg *config.Dendrite, - consumer sarama.Consumer, producer sarama.SyncProducer, deviceDB devices.Database, accountsDB accounts.Database, @@ -45,6 +43,7 @@ func AddPublicRoutes( rsAPI roomserverAPI.RoomserverInternalAPI, eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, userAPI userapi.UserInternalAPI, @@ -54,16 +53,9 @@ func AddPublicRoutes( Topic: string(cfg.Kafka.Topics.OutputClientData), } - roomEventConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, accountsDB, rsAPI, - ) - if err := roomEventConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start room server consumer") - } - routing.Setup( router, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, deviceDB, userAPI, federation, - syncProducer, transactionsCache, fsAPI, + syncProducer, transactionsCache, fsAPI, stateAPI, ) } diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go deleted file mode 100644 index beeda042..00000000 --- a/clientapi/consumers/roomserver.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/matrix-org/gomatrixserverlib" - - "github.com/Shopify/sarama" - log "github.com/sirupsen/logrus" -) - -// OutputRoomEventConsumer consumes events that originated in the room server. -type OutputRoomEventConsumer struct { - rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer - db accounts.Database - serverName string -} - -// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEventConsumer( - cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, - store accounts.Database, - rsAPI api.RoomserverInternalAPI, -) *OutputRoomEventConsumer { - - consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputRoomEventConsumer{ - rsConsumer: &consumer, - db: store, - rsAPI: rsAPI, - serverName: string(cfg.Matrix.ServerName), - } - consumer.ProcessMessage = s.onMessage - - return s -} - -// Start consuming from room servers -func (s *OutputRoomEventConsumer) Start() error { - return s.rsConsumer.Start() -} - -// onMessage is called when the sync server receives a new event from the room server output log. -// It is not safe for this function to be called from multiple goroutines, or else the -// sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } - - return s.db.UpdateMemberships( - context.TODO(), - gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()), - output.NewRoomEvent.RemovesStateEventIDs, - ) -} diff --git a/clientapi/routing/memberships.go b/clientapi/routing/memberships.go index 1c9800b6..9c4cf749 100644 --- a/clientapi/routing/memberships.go +++ b/clientapi/routing/memberships.go @@ -18,9 +18,8 @@ import ( "encoding/json" "net/http" - "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -95,20 +94,19 @@ func GetMemberships( func GetJoinedRooms( req *http.Request, device *userapi.Device, - accountsDB accounts.Database, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) util.JSONResponse { - localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") - return jsonerror.InternalServerError() - } - joinedRooms, err := accountsDB.GetRoomIDsByLocalPart(req.Context(), localpart) + var res currentstateAPI.QueryRoomsForUserResponse + err := stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountsDB.GetRoomIDsByLocalPart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } return util.JSONResponse{ Code: http.StatusOK, - JSON: getJoinedRoomsResponse{joinedRooms}, + JSON: getJoinedRoomsResponse{res.RoomIDs}, } } diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 7c2cd19b..1df4c9b3 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -93,8 +94,8 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url // nolint:gocyclo func SetAvatarURL( - req *http.Request, accountDB accounts.Database, device *userapi.Device, - userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, + req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI, + device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -139,9 +140,13 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart) + var res currentstateAPI.QueryRoomsForUserResponse + err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } @@ -152,7 +157,7 @@ func SetAvatarURL( } events, err := buildMembershipEvents( - req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI, + req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI, ) switch e := err.(type) { case nil: @@ -207,8 +212,8 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname // nolint:gocyclo func SetDisplayName( - req *http.Request, accountDB accounts.Database, device *userapi.Device, - userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, + req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI, + device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -253,9 +258,13 @@ func SetDisplayName( return jsonerror.InternalServerError() } - memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart) + var res currentstateAPI.QueryRoomsForUserResponse + err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } @@ -266,7 +275,7 @@ func SetDisplayName( } events, err := buildMembershipEvents( - req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI, + req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI, ) switch e := err.(type) { case nil: @@ -335,14 +344,14 @@ func getProfile( func buildMembershipEvents( ctx context.Context, - memberships []authtypes.Membership, + roomIDs []string, newProfile authtypes.Profile, userID string, cfg *config.Dendrite, evTime time.Time, rsAPI api.RoomserverInternalAPI, ) ([]gomatrixserverlib.HeaderedEvent, error) { evs := []gomatrixserverlib.HeaderedEvent{} - for _, membership := range memberships { - verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.RoomID} + for _, roomID := range roomIDs { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { return []gomatrixserverlib.HeaderedEvent{}, err @@ -350,7 +359,7 @@ func buildMembershipEvents( builder := gomatrixserverlib.EventBuilder{ Sender: userID, - RoomID: membership.RoomID, + RoomID: roomID, Type: "m.room.member", StateKey: &userID, } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 9dfff0f2..deaa7b32 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -23,6 +23,7 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -58,6 +59,7 @@ func Setup( syncProducer *producers.SyncAPIProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) { publicAPIMux.Handle("/client/versions", @@ -98,7 +100,7 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/joined_rooms", httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse { - return GetJoinedRooms(req, device, accountDB) + return GetJoinedRooms(req, device, stateAPI) }), ).Methods(http.MethodGet, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/join", @@ -307,7 +309,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, stateAPI) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -404,7 +406,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI) + return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -426,7 +428,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI) + return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index 9b6a0b39..54a82286 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -13,15 +13,15 @@ package routing import ( - "database/sql" "net/http" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/userutil" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -36,6 +36,7 @@ func SendTyping( req *http.Request, device *userapi.Device, roomID string, userID string, accountDB accounts.Database, eduAPI api.EDUServerInputAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -44,23 +45,38 @@ func SendTyping( } } - localpart, err := userutil.ParseUsernameParam(userID, nil) + // Verify that the user is a member of this room + tuple := gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomMember, + StateKey: userID, + } + var res currentstateAPI.QueryCurrentStateResponse + err := stateAPI.QueryCurrentState(req.Context(), ¤tstateAPI.QueryCurrentStateRequest{ + RoomID: roomID, + StateTuples: []gomatrixserverlib.StateKeyTuple{tuple}, + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("userutil.ParseUsernameParam failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryCurrentState failed") return jsonerror.InternalServerError() } - - // Verify that the user is a member of this room - _, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID) - if err == sql.ErrNoRows { + ev := res.StateEvents[tuple] + if ev == nil { return util.JSONResponse{ Code: http.StatusForbidden, JSON: jsonerror.Forbidden("User not in this room"), } - } else if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipInRoomByLocalPart failed") + } + membership, err := ev.Membership() + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Member event isn't valid") return jsonerror.InternalServerError() } + if membership != gomatrixserverlib.Join { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("User not in this room"), + } + } // parse the incoming http request var r typingContentJSON |