aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-30 13:34:59 +0100
committerGitHub <noreply@github.com>2020-06-30 13:34:59 +0100
commit6f49758b90d655d9c2bb9170da2ea1d0a2bdd664 (patch)
treef434cc48f27c3915fffdb39c5b907fd021d4ff61 /clientapi
parentca5bbffd8d987b220c8f8eb888a2fc9b9cef104c (diff)
Remove membership table from account DB (#1172)
* Remove membership table from account DB And make code which needs that data use the currentstate server * Unbreak tests; use a membership enum for space
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/clientapi.go14
-rw-r--r--clientapi/consumers/roomserver.go92
-rw-r--r--clientapi/routing/memberships.go20
-rw-r--r--clientapi/routing/profile.go37
-rw-r--r--clientapi/routing/routing.go10
-rw-r--r--clientapi/routing/sendtyping.go36
6 files changed, 67 insertions, 142 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 174eb1bf..8ea84249 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -18,9 +18,9 @@ import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
- "github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@@ -30,14 +30,12 @@ import (
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/sirupsen/logrus"
)
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
func AddPublicRoutes(
router *mux.Router,
cfg *config.Dendrite,
- consumer sarama.Consumer,
producer sarama.SyncProducer,
deviceDB devices.Database,
accountsDB accounts.Database,
@@ -45,6 +43,7 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI,
eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
@@ -54,16 +53,9 @@ func AddPublicRoutes(
Topic: string(cfg.Kafka.Topics.OutputClientData),
}
- roomEventConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, accountsDB, rsAPI,
- )
- if err := roomEventConsumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start room server consumer")
- }
-
routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, userAPI, federation,
- syncProducer, transactionsCache, fsAPI,
+ syncProducer, transactionsCache, fsAPI, stateAPI,
)
}
diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go
deleted file mode 100644
index beeda042..00000000
--- a/clientapi/consumers/roomserver.go
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package consumers
-
-import (
- "context"
- "encoding/json"
-
- "github.com/matrix-org/dendrite/internal"
- "github.com/matrix-org/dendrite/internal/config"
- "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/userapi/storage/accounts"
- "github.com/matrix-org/gomatrixserverlib"
-
- "github.com/Shopify/sarama"
- log "github.com/sirupsen/logrus"
-)
-
-// OutputRoomEventConsumer consumes events that originated in the room server.
-type OutputRoomEventConsumer struct {
- rsAPI api.RoomserverInternalAPI
- rsConsumer *internal.ContinualConsumer
- db accounts.Database
- serverName string
-}
-
-// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
-func NewOutputRoomEventConsumer(
- cfg *config.Dendrite,
- kafkaConsumer sarama.Consumer,
- store accounts.Database,
- rsAPI api.RoomserverInternalAPI,
-) *OutputRoomEventConsumer {
-
- consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- }
- s := &OutputRoomEventConsumer{
- rsConsumer: &consumer,
- db: store,
- rsAPI: rsAPI,
- serverName: string(cfg.Matrix.ServerName),
- }
- consumer.ProcessMessage = s.onMessage
-
- return s
-}
-
-// Start consuming from room servers
-func (s *OutputRoomEventConsumer) Start() error {
- return s.rsConsumer.Start()
-}
-
-// onMessage is called when the sync server receives a new event from the room server output log.
-// It is not safe for this function to be called from multiple goroutines, or else the
-// sync stream position may race and be incorrectly calculated.
-func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Value, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return nil
- }
-
- if output.Type != api.OutputTypeNewRoomEvent {
- log.WithField("type", output.Type).Debug(
- "roomserver output log: ignoring unknown output type",
- )
- return nil
- }
-
- return s.db.UpdateMemberships(
- context.TODO(),
- gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
- output.NewRoomEvent.RemovesStateEventIDs,
- )
-}
diff --git a/clientapi/routing/memberships.go b/clientapi/routing/memberships.go
index 1c9800b6..9c4cf749 100644
--- a/clientapi/routing/memberships.go
+++ b/clientapi/routing/memberships.go
@@ -18,9 +18,8 @@ import (
"encoding/json"
"net/http"
- "github.com/matrix-org/dendrite/userapi/storage/accounts"
-
"github.com/matrix-org/dendrite/clientapi/jsonerror"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -95,20 +94,19 @@ func GetMemberships(
func GetJoinedRooms(
req *http.Request,
device *userapi.Device,
- accountsDB accounts.Database,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
) util.JSONResponse {
- localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
- if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
- return jsonerror.InternalServerError()
- }
- joinedRooms, err := accountsDB.GetRoomIDsByLocalPart(req.Context(), localpart)
+ var res currentstateAPI.QueryRoomsForUserResponse
+ err := stateAPI.QueryRoomsForUser(req.Context(), &currentstateAPI.QueryRoomsForUserRequest{
+ UserID: device.UserID,
+ WantMembership: "join",
+ }, &res)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("accountsDB.GetRoomIDsByLocalPart failed")
+ util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
- JSON: getJoinedRoomsResponse{joinedRooms},
+ JSON: getJoinedRoomsResponse{res.RoomIDs},
}
}
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index 7c2cd19b..1df4c9b3 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -93,8 +94,8 @@ func GetAvatarURL(
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
// nolint:gocyclo
func SetAvatarURL(
- req *http.Request, accountDB accounts.Database, device *userapi.Device,
- userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
+ req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -139,9 +140,13 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
+ var res currentstateAPI.QueryRoomsForUserResponse
+ err = stateAPI.QueryRoomsForUser(req.Context(), &currentstateAPI.QueryRoomsForUserRequest{
+ UserID: device.UserID,
+ WantMembership: "join",
+ }, &res)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
+ util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
return jsonerror.InternalServerError()
}
@@ -152,7 +157,7 @@ func SetAvatarURL(
}
events, err := buildMembershipEvents(
- req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
+ req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
)
switch e := err.(type) {
case nil:
@@ -207,8 +212,8 @@ func GetDisplayName(
// SetDisplayName implements PUT /profile/{userID}/displayname
// nolint:gocyclo
func SetDisplayName(
- req *http.Request, accountDB accounts.Database, device *userapi.Device,
- userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
+ req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -253,9 +258,13 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
+ var res currentstateAPI.QueryRoomsForUserResponse
+ err = stateAPI.QueryRoomsForUser(req.Context(), &currentstateAPI.QueryRoomsForUserRequest{
+ UserID: device.UserID,
+ WantMembership: "join",
+ }, &res)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
+ util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
return jsonerror.InternalServerError()
}
@@ -266,7 +275,7 @@ func SetDisplayName(
}
events, err := buildMembershipEvents(
- req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
+ req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
)
switch e := err.(type) {
case nil:
@@ -335,14 +344,14 @@ func getProfile(
func buildMembershipEvents(
ctx context.Context,
- memberships []authtypes.Membership,
+ roomIDs []string,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
evTime time.Time, rsAPI api.RoomserverInternalAPI,
) ([]gomatrixserverlib.HeaderedEvent, error) {
evs := []gomatrixserverlib.HeaderedEvent{}
- for _, membership := range memberships {
- verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.RoomID}
+ for _, roomID := range roomIDs {
+ verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
return []gomatrixserverlib.HeaderedEvent{}, err
@@ -350,7 +359,7 @@ func buildMembershipEvents(
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
- RoomID: membership.RoomID,
+ RoomID: roomID,
Type: "m.room.member",
StateKey: &userID,
}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 9dfff0f2..deaa7b32 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -23,6 +23,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@@ -58,6 +59,7 @@ func Setup(
syncProducer *producers.SyncAPIProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
) {
publicAPIMux.Handle("/client/versions",
@@ -98,7 +100,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/joined_rooms",
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse {
- return GetJoinedRooms(req, device, accountDB)
+ return GetJoinedRooms(req, device, stateAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/join",
@@ -307,7 +309,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
+ return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, stateAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -404,7 +406,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
+ return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@@ -426,7 +428,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
+ return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go
index 9b6a0b39..54a82286 100644
--- a/clientapi/routing/sendtyping.go
+++ b/clientapi/routing/sendtyping.go
@@ -13,15 +13,15 @@
package routing
import (
- "database/sql"
"net/http"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/userutil"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@@ -36,6 +36,7 @@ func SendTyping(
req *http.Request, device *userapi.Device, roomID string,
userID string, accountDB accounts.Database,
eduAPI api.EDUServerInputAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@@ -44,23 +45,38 @@ func SendTyping(
}
}
- localpart, err := userutil.ParseUsernameParam(userID, nil)
+ // Verify that the user is a member of this room
+ tuple := gomatrixserverlib.StateKeyTuple{
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: userID,
+ }
+ var res currentstateAPI.QueryCurrentStateResponse
+ err := stateAPI.QueryCurrentState(req.Context(), &currentstateAPI.QueryCurrentStateRequest{
+ RoomID: roomID,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{tuple},
+ }, &res)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("userutil.ParseUsernameParam failed")
+ util.GetLogger(req.Context()).WithError(err).Error("QueryCurrentState failed")
return jsonerror.InternalServerError()
}
-
- // Verify that the user is a member of this room
- _, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID)
- if err == sql.ErrNoRows {
+ ev := res.StateEvents[tuple]
+ if ev == nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("User not in this room"),
}
- } else if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipInRoomByLocalPart failed")
+ }
+ membership, err := ev.Membership()
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Member event isn't valid")
return jsonerror.InternalServerError()
}
+ if membership != gomatrixserverlib.Join {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("User not in this room"),
+ }
+ }
// parse the incoming http request
var r typingContentJSON