aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-09-07 12:38:09 +0100
committerGitHub <noreply@github.com>2020-09-07 12:38:09 +0100
commit79137599217808d9f9585628408a6c2aa26ab11d (patch)
tree6b7aa04541b563767ddd0ca04c873c634127678c
parent895ead804893191b34fd52a549b22331997d45f7 (diff)
Remove QueryBulkStateContent from current state server (#1404)
* Remove QueryBulkStateContent from current state server Expected fail due to db impl not existing * Implement query bulk state content * Fix up rejecting invites over federation * Fix bulk content marshalling
-rw-r--r--clientapi/routing/directory_public.go14
-rw-r--r--clientapi/routing/routing.go2
-rw-r--r--cmd/dendrite-demo-libp2p/publicrooms.go2
-rw-r--r--currentstateserver/api/api.go32
-rw-r--r--currentstateserver/api/wrapper.go89
-rw-r--r--currentstateserver/internal/api.go24
-rw-r--r--currentstateserver/inthttp/client.go15
-rw-r--r--currentstateserver/inthttp/server.go19
-rw-r--r--federationapi/routing/publicrooms.go18
-rw-r--r--federationapi/routing/routing.go2
-rw-r--r--roomserver/api/query.go33
-rw-r--r--roomserver/internal/query/query.go3
-rw-r--r--roomserver/storage/interface.go2
-rw-r--r--roomserver/storage/shared/storage.go87
-rw-r--r--roomserver/storage/tables/interface.go43
-rw-r--r--syncapi/consumers/keychange.go4
-rw-r--r--syncapi/internal/keychange.go12
-rw-r--r--syncapi/internal/keychange_test.go34
-rw-r--r--syncapi/sync/requestpool.go2
19 files changed, 198 insertions, 239 deletions
diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go
index bae7e49b..fd7bc1e8 100644
--- a/clientapi/routing/directory_public.go
+++ b/clientapi/routing/directory_public.go
@@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -51,7 +50,7 @@ type filter struct {
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(
- req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
federation *gomatrixserverlib.FederationClient,
cfg *config.ClientAPI,
@@ -75,7 +74,7 @@ func GetPostPublicRooms(
}
}
- response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider)
+ response, err := publicRooms(req.Context(), request, rsAPI, extRoomsProvider)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms")
return jsonerror.InternalServerError()
@@ -86,8 +85,8 @@ func GetPostPublicRooms(
}
}
-func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
- stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
+func publicRooms(
+ ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
) (*gomatrixserverlib.RespPublicRooms, error) {
response := gomatrixserverlib.RespPublicRooms{
@@ -110,7 +109,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
var rooms []gomatrixserverlib.PublicRoom
if request.Since == "" {
- rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI)
+ rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider)
} else {
rooms = getPublicRoomsFromCache()
}
@@ -226,7 +225,6 @@ func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (
func refreshPublicRoomCache(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
- stateAPI currentstateAPI.CurrentStateInternalAPI,
) []gomatrixserverlib.PublicRoom {
cacheMu.Lock()
defer cacheMu.Unlock()
@@ -241,7 +239,7 @@ func refreshPublicRoomCache(
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return publicRoomsCache
}
- pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI)
+ pubRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, rsAPI)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return publicRoomsCache
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 0445852d..97ab03e3 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -336,7 +336,7 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/publicRooms",
httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse {
- return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider, federation, cfg)
+ return GetPostPublicRooms(req, rsAPI, extRoomsProvider, federation, cfg)
}),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)
diff --git a/cmd/dendrite-demo-libp2p/publicrooms.go b/cmd/dendrite-demo-libp2p/publicrooms.go
index 2160ddef..838ba77b 100644
--- a/cmd/dendrite-demo-libp2p/publicrooms.go
+++ b/cmd/dendrite-demo-libp2p/publicrooms.go
@@ -106,7 +106,7 @@ func (p *publicRoomsProvider) AdvertiseRooms() error {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return err
}
- ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI)
+ ourRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.rsAPI)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return err
diff --git a/currentstateserver/api/api.go b/currentstateserver/api/api.go
index f11422ac..536ae0ed 100644
--- a/currentstateserver/api/api.go
+++ b/currentstateserver/api/api.go
@@ -14,37 +14,5 @@
package api
-import (
- "context"
-
- "github.com/matrix-org/gomatrixserverlib"
-)
-
type CurrentStateInternalAPI interface {
- // QueryBulkStateContent does a bulk query for state event content in the given rooms.
- QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
-}
-
-type QueryBulkStateContentRequest struct {
- // Returns state events in these rooms
- RoomIDs []string
- // If true, treats the '*' StateKey as "all state events of this type" rather than a literal value of '*'
- AllowWildcards bool
- // The state events to return. Only a small subset of tuples are allowed in this request as only certain events
- // have their content fields extracted. Specifically, the tuple Type must be one of:
- // m.room.avatar
- // m.room.create
- // m.room.canonical_alias
- // m.room.guest_access
- // m.room.history_visibility
- // m.room.join_rules
- // m.room.member
- // m.room.name
- // m.room.topic
- // Any other tuple type will result in the query failing.
- StateTuples []gomatrixserverlib.StateKeyTuple
-}
-type QueryBulkStateContentResponse struct {
- // map of room ID -> tuple -> content_value
- Rooms map[string]map[gomatrixserverlib.StateKeyTuple]string
}
diff --git a/currentstateserver/api/wrapper.go b/currentstateserver/api/wrapper.go
deleted file mode 100644
index 20fae825..00000000
--- a/currentstateserver/api/wrapper.go
+++ /dev/null
@@ -1,89 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package api
-
-import (
- "context"
-
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
-)
-
-// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the
-// published room directory.
-// due to lots of switches
-// nolint:gocyclo
-func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
- avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
- nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
- canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
- topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
- guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
- visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
- joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
-
- var stateRes QueryBulkStateContentResponse
- err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{
- RoomIDs: roomIDs,
- AllowWildcards: true,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
- {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
- },
- }, &stateRes)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
- return nil, err
- }
- chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
- i := 0
- for roomID, data := range stateRes.Rooms {
- pub := gomatrixserverlib.PublicRoom{
- RoomID: roomID,
- }
- joinCount := 0
- var joinRule, guestAccess string
- for tuple, contentVal := range data {
- if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
- joinCount++
- continue
- }
- switch tuple {
- case avatarTuple:
- pub.AvatarURL = contentVal
- case nameTuple:
- pub.Name = contentVal
- case topicTuple:
- pub.Topic = contentVal
- case canonicalTuple:
- pub.CanonicalAlias = contentVal
- case visibilityTuple:
- pub.WorldReadable = contentVal == "world_readable"
- // need both of these to determine whether guests can join
- case joinRuleTuple:
- joinRule = contentVal
- case guestTuple:
- guestAccess = contentVal
- }
- }
- if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
- pub.GuestCanJoin = true
- }
- pub.JoinedMembersCount = joinCount
- chunk[i] = pub
- i++
- }
- return chunk, nil
-}
diff --git a/currentstateserver/internal/api.go b/currentstateserver/internal/api.go
index 2d6da1e6..f218fa19 100644
--- a/currentstateserver/internal/api.go
+++ b/currentstateserver/internal/api.go
@@ -15,33 +15,9 @@
package internal
import (
- "context"
-
- "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/currentstateserver/storage"
- "github.com/matrix-org/gomatrixserverlib"
)
type CurrentStateInternalAPI struct {
DB storage.Database
}
-
-func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
- events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards)
- if err != nil {
- return err
- }
- res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
- for _, ev := range events {
- if res.Rooms[ev.RoomID] == nil {
- res.Rooms[ev.RoomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
- }
- room := res.Rooms[ev.RoomID]
- room[gomatrixserverlib.StateKeyTuple{
- EventType: ev.EventType,
- StateKey: ev.StateKey,
- }] = ev.ContentValue
- res.Rooms[ev.RoomID] = room
- }
- return nil
-}
diff --git a/currentstateserver/inthttp/client.go b/currentstateserver/inthttp/client.go
index 13130716..20176849 100644
--- a/currentstateserver/inthttp/client.go
+++ b/currentstateserver/inthttp/client.go
@@ -15,13 +15,10 @@
package inthttp
import (
- "context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/currentstateserver/api"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/opentracing/opentracing-go"
)
// HTTP paths for the internal HTTP APIs
@@ -49,15 +46,3 @@ type httpCurrentStateInternalAPI struct {
apiURL string
httpClient *http.Client
}
-
-func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
- ctx context.Context,
- request *api.QueryBulkStateContentRequest,
- response *api.QueryBulkStateContentResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBulkStateContent")
- defer span.Finish()
-
- apiURL := h.apiURL + QueryBulkStateContentPath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
diff --git a/currentstateserver/inthttp/server.go b/currentstateserver/inthttp/server.go
index 70d6ecfd..3847344c 100644
--- a/currentstateserver/inthttp/server.go
+++ b/currentstateserver/inthttp/server.go
@@ -15,27 +15,10 @@
package inthttp
import (
- "encoding/json"
- "net/http"
-
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/currentstateserver/api"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/matrix-org/util"
)
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
- internalAPIMux.Handle(QueryBulkStateContentPath,
- httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse {
- request := api.QueryBulkStateContentRequest{}
- response := api.QueryBulkStateContentResponse{}
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- if err := intAPI.QueryBulkStateContent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
+
}
diff --git a/federationapi/routing/publicrooms.go b/federationapi/routing/publicrooms.go
index 3807a518..d923a236 100644
--- a/federationapi/routing/publicrooms.go
+++ b/federationapi/routing/publicrooms.go
@@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -24,7 +23,7 @@ type filter struct {
}
// GetPostPublicRooms implements GET and POST /publicRooms
-func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse {
+func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI) util.JSONResponse {
var request PublicRoomReq
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
return *fillErr
@@ -32,7 +31,7 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
if request.Limit == 0 {
request.Limit = 50
}
- response, err := publicRooms(req.Context(), request, rsAPI, stateAPI)
+ response, err := publicRooms(req.Context(), request, rsAPI)
if err != nil {
return jsonerror.InternalServerError()
}
@@ -42,8 +41,9 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
}
}
-func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
- stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) {
+func publicRooms(
+ ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
+) (*gomatrixserverlib.RespPublicRooms, error) {
var response gomatrixserverlib.RespPublicRooms
var limit int16
@@ -80,7 +80,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
nextIndex = len(queryRes.RoomIDs)
}
roomIDs := queryRes.RoomIDs[offset:nextIndex]
- response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI)
+ response.Chunk, err = fillInRooms(ctx, roomIDs, rsAPI)
return &response, err
}
@@ -112,7 +112,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// due to lots of switches
// nolint:gocyclo
-func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
+func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
@@ -121,8 +121,8 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ var stateRes roomserverAPI.QueryBulkStateContentResponse
+ err := rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go
index 4c43be27..7d60d15e 100644
--- a/federationapi/routing/routing.go
+++ b/federationapi/routing/routing.go
@@ -390,7 +390,7 @@ func Setup(
v1fedmux.Handle("/publicRooms",
httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse {
- return GetPostPublicRooms(req, rsAPI, stateAPI)
+ return GetPostPublicRooms(req, rsAPI)
}),
).Methods(http.MethodGet)
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index d0d0474d..67a217c8 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -303,6 +303,39 @@ type QueryServerBannedFromRoomResponse struct {
Banned bool `json:"banned"`
}
+// MarshalJSON stringifies the room ID and StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
+func (r *QueryBulkStateContentResponse) MarshalJSON() ([]byte, error) {
+ se := make(map[string]string)
+ for roomID, tupleToEvent := range r.Rooms {
+ for tuple, event := range tupleToEvent {
+ // use 0x1F (unit separator) as the delimiter between room ID/type/state key,
+ se[fmt.Sprintf("%s\x1F%s\x1F%s", roomID, tuple.EventType, tuple.StateKey)] = event
+ }
+ }
+ return json.Marshal(se)
+}
+
+func (r *QueryBulkStateContentResponse) UnmarshalJSON(data []byte) error {
+ wireFormat := make(map[string]string)
+ err := json.Unmarshal(data, &wireFormat)
+ if err != nil {
+ return err
+ }
+ r.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
+ for roomTuple, value := range wireFormat {
+ fields := strings.Split(roomTuple, "\x1F")
+ roomID := fields[0]
+ if r.Rooms[roomID] == nil {
+ r.Rooms[roomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
+ }
+ r.Rooms[roomID][gomatrixserverlib.StateKeyTuple{
+ EventType: fields[1],
+ StateKey: fields[2],
+ }] = value
+ }
+ return nil
+}
+
// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents))
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index f76c9316..b34ae770 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -140,6 +140,9 @@ func (r *Queryer) QueryMembershipForUser(
if err != nil {
return err
}
+ if info == nil {
+ return fmt.Errorf("QueryMembershipForUser: unknown room %s", request.RoomID)
+ }
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID)
if err != nil {
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index c4119f7e..be724da6 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -17,9 +17,9 @@ package storage
import (
"context"
- "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
+ "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index a081603f..5c18c725 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -7,7 +7,6 @@ import (
"fmt"
"sort"
- csstables "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -799,8 +798,90 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
-func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]csstables.StrippedEvent, error) {
- return nil, fmt.Errorf("not implemented yet")
+// nolint:gocyclo
+func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) {
+ eventTypes := make([]string, 0, len(tuples))
+ for _, tuple := range tuples {
+ eventTypes = append(eventTypes, tuple.EventType)
+ }
+ // we don't bother failing the request if we get asked for event types we don't know about, as all that would result in is no matches which
+ // isn't a failure.
+ eventTypeNIDMap, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, eventTypes)
+ if err != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to map event type nids: %w", err)
+ }
+ typeNIDSet := make(map[types.EventTypeNID]bool)
+ for _, nid := range eventTypeNIDMap {
+ typeNIDSet[nid] = true
+ }
+
+ allowWildcard := make(map[types.EventTypeNID]bool)
+ eventStateKeys := make([]string, 0, len(tuples))
+ for _, tuple := range tuples {
+ if allowWildcards && tuple.StateKey == "*" {
+ allowWildcard[eventTypeNIDMap[tuple.EventType]] = true
+ continue
+ }
+ eventStateKeys = append(eventStateKeys, tuple.StateKey)
+
+ }
+
+ eventStateKeyNIDMap, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, eventStateKeys)
+ if err != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to map state key nids: %w", err)
+ }
+ stateKeyNIDSet := make(map[types.EventStateKeyNID]bool)
+ for _, nid := range eventStateKeyNIDMap {
+ stateKeyNIDSet[nid] = true
+ }
+
+ var eventNIDs []types.EventNID
+ eventNIDToVer := make(map[types.EventNID]gomatrixserverlib.RoomVersion)
+ // TODO: This feels like this is going to be really slow...
+ for _, roomID := range roomIDs {
+ roomInfo, err2 := d.RoomInfo(ctx, roomID)
+ if err2 != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to load room info for room %s : %w", roomID, err2)
+ }
+ // for unknown rooms or rooms which we don't have the current state, skip them.
+ if roomInfo == nil || roomInfo.IsStub {
+ continue
+ }
+ entries, err2 := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
+ if err2 != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to load state for room %s : %w", roomID, err2)
+ }
+ for _, entry := range entries {
+ if typeNIDSet[entry.EventTypeNID] {
+ if allowWildcard[entry.EventTypeNID] || stateKeyNIDSet[entry.EventStateKeyNID] {
+ eventNIDs = append(eventNIDs, entry.EventNID)
+ eventNIDToVer[entry.EventNID] = roomInfo.RoomVersion
+ }
+ }
+ }
+ }
+
+ events, err := d.EventJSONTable.BulkSelectEventJSON(ctx, eventNIDs)
+ if err != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event nids: %w", err)
+ }
+ result := make([]tables.StrippedEvent, len(events))
+ for i := range events {
+ roomVer := eventNIDToVer[events[i].EventNID]
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i].EventJSON, false, roomVer)
+ if err != nil {
+ return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event NID %v : %w", events[i].EventNID, err)
+ }
+ hev := ev.Headered(roomVer)
+ result[i] = tables.StrippedEvent{
+ EventType: ev.Type(),
+ RoomID: ev.RoomID(),
+ StateKey: *ev.StateKey(),
+ ContentValue: tables.ExtractContentValue(&hev),
+ }
+ }
+
+ return result, nil
}
// JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear.
diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go
index a142f2b1..adb06212 100644
--- a/roomserver/storage/tables/interface.go
+++ b/roomserver/storage/tables/interface.go
@@ -6,6 +6,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/tidwall/gjson"
)
type EventJSONPair struct {
@@ -155,3 +156,45 @@ type Redactions interface {
// successfully redacted the event JSON.
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
}
+
+// StrippedEvent represents a stripped event for returning extracted content values.
+type StrippedEvent struct {
+ RoomID string
+ EventType string
+ StateKey string
+ ContentValue string
+}
+
+// ExtractContentValue from the given state event. For example, given an m.room.name event with:
+// content: { name: "Foo" }
+// this returns "Foo".
+func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
+ content := ev.Content()
+ key := ""
+ switch ev.Type() {
+ case gomatrixserverlib.MRoomCreate:
+ key = "creator"
+ case gomatrixserverlib.MRoomCanonicalAlias:
+ key = "alias"
+ case gomatrixserverlib.MRoomHistoryVisibility:
+ key = "history_visibility"
+ case gomatrixserverlib.MRoomJoinRules:
+ key = "join_rule"
+ case gomatrixserverlib.MRoomMember:
+ key = "membership"
+ case gomatrixserverlib.MRoomName:
+ key = "name"
+ case "m.room.avatar":
+ key = "url"
+ case "m.room.topic":
+ key = "topic"
+ case "m.room.guest_access":
+ key = "guest_access"
+ }
+ result := gjson.GetBytes(content, key)
+ if !result.Exists() {
+ return ""
+ }
+ // this returns the empty string if this is not a string type
+ return result.Str
+}
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 33797378..5b50bac2 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -133,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
- changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
+ changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@@ -146,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
- _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
+ _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index f2f50aef..090e0c65 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -19,7 +19,6 @@ import (
"strings"
"github.com/Shopify/sarama"
- currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@@ -50,7 +49,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
- stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
@@ -58,7 +56,7 @@ func DeviceListCatchup(
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
- changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
+ changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
}
@@ -144,7 +142,7 @@ func DeviceListCatchup(
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers(
- ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
+ ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
@@ -161,8 +159,8 @@ func TrackChangedUsers(
if err != nil {
return nil, nil, err
}
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ var stateRes roomserverAPI.QueryBulkStateContentResponse
+ err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
@@ -202,7 +200,7 @@ func TrackChangedUsers(
if err != nil {
return nil, left, err
}
- err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index 7d739430..c2501181 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -7,7 +7,6 @@ import (
"testing"
"github.com/Shopify/sarama"
- stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -108,25 +107,6 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query
return nil
}
-type mockStateAPI struct {
- rsAPI *mockRoomserverAPI
-}
-
-// QueryBulkStateContent does a bulk query for state event content in the given rooms.
-func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
- var res2 api.QueryBulkStateContentResponse
- err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
- RoomIDs: req.RoomIDs,
- AllowWildcards: req.AllowWildcards,
- StateTuples: req.StateTuples,
- }, &res2)
- if err != nil {
- return err
- }
- res.Rooms = res2.Rooms
- return nil
-}
-
type wantCatchup struct {
hasNew bool
changed []string
@@ -200,7 +180,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -223,7 +203,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -246,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -268,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -327,7 +307,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -355,7 +335,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -441,7 +421,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
},
}
hasNew, err := DeviceListCatchup(
- context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken,
+ context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 2859da71..319a8149 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -267,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
- _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to)
+ _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}