aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/query/query.go
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal/query/query.go')
-rw-r--r--roomserver/internal/query/query.go504
1 files changed, 504 insertions, 0 deletions
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
new file mode 100644
index 00000000..b2799aef
--- /dev/null
+++ b/roomserver/internal/query/query.go
@@ -0,0 +1,504 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package query
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/dendrite/roomserver/version"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+)
+
+type Queryer struct {
+ DB storage.Database
+ Cache caching.RoomServerCaches
+}
+
+// QueryLatestEventsAndState implements api.RoomserverInternalAPI
+func (r *Queryer) QueryLatestEventsAndState(
+ ctx context.Context,
+ request *api.QueryLatestEventsAndStateRequest,
+ response *api.QueryLatestEventsAndStateResponse,
+) error {
+ return helpers.QueryLatestEventsAndState(ctx, r.DB, request, response)
+}
+
+// QueryStateAfterEvents implements api.RoomserverInternalAPI
+func (r *Queryer) QueryStateAfterEvents(
+ ctx context.Context,
+ request *api.QueryStateAfterEventsRequest,
+ response *api.QueryStateAfterEventsResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return nil
+ }
+
+ roomState := state.NewStateResolution(r.DB, *info)
+ response.RoomExists = true
+ response.RoomVersion = info.RoomVersion
+
+ prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs)
+ if err != nil {
+ switch err.(type) {
+ case types.MissingEventError:
+ return nil
+ default:
+ return err
+ }
+ }
+ response.PrevEventsExist = true
+
+ // Look up the currrent state for the requested tuples.
+ stateEntries, err := roomState.LoadStateAfterEventsForStringTuples(
+ ctx, prevStates, request.StateToFetch,
+ )
+ if err != nil {
+ return err
+ }
+
+ stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range stateEvents {
+ response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
+ }
+
+ return nil
+}
+
+// QueryEventsByID implements api.RoomserverInternalAPI
+func (r *Queryer) QueryEventsByID(
+ ctx context.Context,
+ request *api.QueryEventsByIDRequest,
+ response *api.QueryEventsByIDResponse,
+) error {
+ eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
+ if err != nil {
+ return err
+ }
+
+ var eventNIDs []types.EventNID
+ for _, nid := range eventNIDMap {
+ eventNIDs = append(eventNIDs, nid)
+ }
+
+ events, err := helpers.LoadEvents(ctx, r.DB, eventNIDs)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range events {
+ roomVersion, verr := r.roomVersion(event.RoomID())
+ if verr != nil {
+ return verr
+ }
+
+ response.Events = append(response.Events, event.Headered(roomVersion))
+ }
+
+ return nil
+}
+
+// QueryMembershipForUser implements api.RoomserverInternalAPI
+func (r *Queryer) QueryMembershipForUser(
+ ctx context.Context,
+ request *api.QueryMembershipForUserRequest,
+ response *api.QueryMembershipForUserResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+
+ membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID)
+ if err != nil {
+ return err
+ }
+
+ if membershipEventNID == 0 {
+ response.HasBeenInRoom = false
+ return nil
+ }
+
+ response.IsInRoom = stillInRoom
+ response.HasBeenInRoom = true
+
+ evs, err := r.DB.Events(ctx, []types.EventNID{membershipEventNID})
+ if err != nil {
+ return err
+ }
+ if len(evs) != 1 {
+ return fmt.Errorf("failed to load membership event for event NID %d", membershipEventNID)
+ }
+
+ response.EventID = evs[0].EventID()
+ response.Membership, err = evs[0].Membership()
+ return err
+}
+
+// QueryMembershipsForRoom implements api.RoomserverInternalAPI
+func (r *Queryer) QueryMembershipsForRoom(
+ ctx context.Context,
+ request *api.QueryMembershipsForRoomRequest,
+ response *api.QueryMembershipsForRoomResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+
+ membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender)
+ if err != nil {
+ return err
+ }
+
+ if membershipEventNID == 0 {
+ response.HasBeenInRoom = false
+ response.JoinEvents = nil
+ return nil
+ }
+
+ response.HasBeenInRoom = true
+ response.JoinEvents = []gomatrixserverlib.ClientEvent{}
+
+ var events []types.Event
+ var stateEntries []types.StateEntry
+ if stillInRoom {
+ var eventNIDs []types.EventNID
+ eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, false)
+ if err != nil {
+ return err
+ }
+
+ events, err = r.DB.Events(ctx, eventNIDs)
+ } else {
+ stateEntries, err = helpers.StateBeforeEvent(ctx, r.DB, *info, membershipEventNID)
+ if err != nil {
+ logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event")
+ return err
+ }
+ events, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntries, request.JoinedOnly)
+ }
+
+ if err != nil {
+ return err
+ }
+
+ for _, event := range events {
+ clientEvent := gomatrixserverlib.ToClientEvent(event.Event, gomatrixserverlib.FormatAll)
+ response.JoinEvents = append(response.JoinEvents, clientEvent)
+ }
+
+ return nil
+}
+
+// QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI
+func (r *Queryer) QueryServerAllowedToSeeEvent(
+ ctx context.Context,
+ request *api.QueryServerAllowedToSeeEventRequest,
+ response *api.QueryServerAllowedToSeeEventResponse,
+) (err error) {
+ events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
+ if err != nil {
+ return
+ }
+ if len(events) == 0 {
+ response.AllowedToSeeEvent = false // event doesn't exist so not allowed to see
+ return
+ }
+ roomID := events[0].RoomID()
+ isServerInRoom, err := helpers.IsServerCurrentlyInRoom(ctx, r.DB, request.ServerName, roomID)
+ if err != nil {
+ return
+ }
+ info, err := r.DB.RoomInfo(ctx, roomID)
+ if err != nil {
+ return err
+ }
+ if info == nil {
+ return fmt.Errorf("QueryServerAllowedToSeeEvent: no room info for room %s", roomID)
+ }
+ response.AllowedToSeeEvent, err = helpers.CheckServerAllowedToSeeEvent(
+ ctx, r.DB, *info, request.EventID, request.ServerName, isServerInRoom,
+ )
+ return
+}
+
+// QueryMissingEvents implements api.RoomserverInternalAPI
+// nolint:gocyclo
+func (r *Queryer) QueryMissingEvents(
+ ctx context.Context,
+ request *api.QueryMissingEventsRequest,
+ response *api.QueryMissingEventsResponse,
+) error {
+ var front []string
+ eventsToFilter := make(map[string]bool, len(request.LatestEvents))
+ visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size.
+ for _, id := range request.EarliestEvents {
+ visited[id] = true
+ }
+
+ for _, id := range request.LatestEvents {
+ if !visited[id] {
+ front = append(front, id)
+ eventsToFilter[id] = true
+ }
+ }
+ events, err := r.DB.EventsFromIDs(ctx, front)
+ if err != nil {
+ return err
+ }
+ if len(events) == 0 {
+ return nil // we are missing the events being asked to search from, give up.
+ }
+ info, err := r.DB.RoomInfo(ctx, events[0].RoomID())
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return fmt.Errorf("missing RoomInfo for room %s", events[0].RoomID())
+ }
+
+ resultNIDs, err := helpers.ScanEventTree(ctx, r.DB, *info, front, visited, request.Limit, request.ServerName)
+ if err != nil {
+ return err
+ }
+
+ loadedEvents, err := helpers.LoadEvents(ctx, r.DB, resultNIDs)
+ if err != nil {
+ return err
+ }
+
+ response.Events = make([]gomatrixserverlib.HeaderedEvent, 0, len(loadedEvents)-len(eventsToFilter))
+ for _, event := range loadedEvents {
+ if !eventsToFilter[event.EventID()] {
+ roomVersion, verr := r.roomVersion(event.RoomID())
+ if verr != nil {
+ return verr
+ }
+
+ response.Events = append(response.Events, event.Headered(roomVersion))
+ }
+ }
+
+ return err
+}
+
+// QueryStateAndAuthChain implements api.RoomserverInternalAPI
+func (r *Queryer) QueryStateAndAuthChain(
+ ctx context.Context,
+ request *api.QueryStateAndAuthChainRequest,
+ response *api.QueryStateAndAuthChainResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return nil
+ }
+ response.RoomExists = true
+ response.RoomVersion = info.RoomVersion
+
+ stateEvents, err := r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
+ if err != nil {
+ return err
+ }
+ response.PrevEventsExist = true
+
+ // add the auth event IDs for the current state events too
+ var authEventIDs []string
+ authEventIDs = append(authEventIDs, request.AuthEventIDs...)
+ for _, se := range stateEvents {
+ authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
+ }
+ authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
+
+ authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ if err != nil {
+ return err
+ }
+
+ if request.ResolveState {
+ if stateEvents, err = state.ResolveConflictsAdhoc(
+ info.RoomVersion, stateEvents, authEvents,
+ ); err != nil {
+ return err
+ }
+ }
+
+ for _, event := range stateEvents {
+ response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
+ }
+
+ for _, event := range authEvents {
+ response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
+ }
+
+ return err
+}
+
+func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+ roomState := state.NewStateResolution(r.DB, roomInfo)
+ prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
+ if err != nil {
+ switch err.(type) {
+ case types.MissingEventError:
+ return nil, nil
+ default:
+ return nil, err
+ }
+ }
+
+ // Look up the currrent state for the requested tuples.
+ stateEntries, err := roomState.LoadCombinedStateAfterEvents(
+ ctx, prevStates,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return helpers.LoadStateEvents(ctx, r.DB, stateEntries)
+}
+
+type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
+
+// getAuthChain fetches the auth chain for the given auth events. An auth chain
+// is the list of all events that are referenced in the auth_events section, and
+// all their auth_events, recursively. The returned set of events contain the
+// given events. Will *not* error if we don't have all auth events.
+func getAuthChain(
+ ctx context.Context, fn eventsFromIDs, authEventIDs []string,
+) ([]gomatrixserverlib.Event, error) {
+ // List of event IDs to fetch. On each pass, these events will be requested
+ // from the database and the `eventsToFetch` will be updated with any new
+ // events that we have learned about and need to find. When `eventsToFetch`
+ // is eventually empty, we should have reached the end of the chain.
+ eventsToFetch := authEventIDs
+ authEventsMap := make(map[string]gomatrixserverlib.Event)
+
+ for len(eventsToFetch) > 0 {
+ // Try to retrieve the events from the database.
+ events, err := fn(ctx, eventsToFetch)
+ if err != nil {
+ return nil, err
+ }
+
+ // We've now fetched these events so clear out `eventsToFetch`. Soon we may
+ // add newly discovered events to this for the next pass.
+ eventsToFetch = eventsToFetch[:0]
+
+ for _, event := range events {
+ // Store the event in the event map - this prevents us from requesting it
+ // from the database again.
+ authEventsMap[event.EventID()] = event.Event
+
+ // Extract all of the auth events from the newly obtained event. If we
+ // don't already have a record of the event, record it in the list of
+ // events we want to request for the next pass.
+ for _, authEvent := range event.AuthEvents() {
+ if _, ok := authEventsMap[authEvent.EventID]; !ok {
+ eventsToFetch = append(eventsToFetch, authEvent.EventID)
+ }
+ }
+ }
+ }
+
+ // We've now retrieved all of the events we can. Flatten them down into an
+ // array and return them.
+ var authEvents []gomatrixserverlib.Event
+ for _, event := range authEventsMap {
+ authEvents = append(authEvents, event)
+ }
+
+ return authEvents, nil
+}
+
+// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
+func (r *Queryer) QueryRoomVersionCapabilities(
+ ctx context.Context,
+ request *api.QueryRoomVersionCapabilitiesRequest,
+ response *api.QueryRoomVersionCapabilitiesResponse,
+) error {
+ response.DefaultRoomVersion = version.DefaultRoomVersion()
+ response.AvailableRoomVersions = make(map[gomatrixserverlib.RoomVersion]string)
+ for v, desc := range version.SupportedRoomVersions() {
+ if desc.Stable {
+ response.AvailableRoomVersions[v] = "stable"
+ } else {
+ response.AvailableRoomVersions[v] = "unstable"
+ }
+ }
+ return nil
+}
+
+// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
+func (r *Queryer) QueryRoomVersionForRoom(
+ ctx context.Context,
+ request *api.QueryRoomVersionForRoomRequest,
+ response *api.QueryRoomVersionForRoomResponse,
+) error {
+ if roomVersion, ok := r.Cache.GetRoomVersion(request.RoomID); ok {
+ response.RoomVersion = roomVersion
+ return nil
+ }
+
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil {
+ return fmt.Errorf("QueryRoomVersionForRoom: missing room info for room %s", request.RoomID)
+ }
+ response.RoomVersion = info.RoomVersion
+ r.Cache.StoreRoomVersion(request.RoomID, response.RoomVersion)
+ return nil
+}
+
+func (r *Queryer) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) {
+ var res api.QueryRoomVersionForRoomResponse
+ err := r.QueryRoomVersionForRoom(context.Background(), &api.QueryRoomVersionForRoomRequest{
+ RoomID: roomID,
+ }, &res)
+ return res.RoomVersion, err
+}
+
+func (r *Queryer) QueryPublishedRooms(
+ ctx context.Context,
+ req *api.QueryPublishedRoomsRequest,
+ res *api.QueryPublishedRoomsResponse,
+) error {
+ rooms, err := r.DB.GetPublishedRooms(ctx)
+ if err != nil {
+ return err
+ }
+ res.RoomIDs = rooms
+ return nil
+}