aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-04-28 11:46:47 +0100
committerGitHub <noreply@github.com>2020-04-28 11:46:47 +0100
commit6d832ae544a6221eb01dc7bad170d3b25a534a1e (patch)
treead0eda5ec31154e83188fec4cbc49252cb85b505 /roomserver
parent3a858afca2368f588b2681de4f4816f26686f540 (diff)
Implement backfill in the roomserver (#983)
* Initial cut for backfilling The syncserver now asks the roomserver via QueryBackfill (which already existed to *handle* backfill requests) which then makes federation requests via gomatrixserverlib.RequestBackfill. Currently, tests fail on subsequent /messages requests because we don't know which servers are in the room, because we are unable to get state snapshots from a backfilled event because that code doesn't exist yet. * WIP backfill, doesn't work * Make initial backfill pass checks * Persist backfilled events with state snapshots * Remove debug lines * Linting * Review comments
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/query.go46
-rw-r--r--roomserver/input/events.go8
-rw-r--r--roomserver/query/backfill.go217
-rw-r--r--roomserver/query/query.go147
-rw-r--r--roomserver/roomserver.go8
-rw-r--r--roomserver/storage/interface.go3
6 files changed, 329 insertions, 100 deletions
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index b272b1eb..11fa5c9c 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -229,6 +229,8 @@ type QueryStateAndAuthChainResponse struct {
// QueryBackfillRequest is a request to QueryBackfill.
type QueryBackfillRequest struct {
+ // The room to backfill
+ RoomID string `json:"room_id"`
// Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"`
// The maximum number of events to retrieve.
@@ -243,21 +245,7 @@ type QueryBackfillResponse struct {
Events []gomatrixserverlib.HeaderedEvent `json:"events"`
}
-// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent
-type QueryServersInRoomAtEventRequest struct {
- // ID of the room to retrieve member servers for.
- RoomID string `json:"room_id"`
- // ID of the event for which to retrieve member servers.
- EventID string `json:"event_id"`
-}
-
-// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent
-type QueryServersInRoomAtEventResponse struct {
- // Servers present in the room for these events.
- Servers []gomatrixserverlib.ServerName `json:"servers"`
-}
-
-// QueryRoomVersionCapabilities asks for the default room version
+// QueryRoomVersionCapabilitiesRequest asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest
@@ -266,12 +254,12 @@ type QueryRoomVersionCapabilitiesResponse struct {
AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
}
-// QueryRoomVersionForRoom asks for the room version for a given room.
+// QueryRoomVersionForRoomRequest asks for the room version for a given room.
type QueryRoomVersionForRoomRequest struct {
RoomID string `json:"room_id"`
}
-// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse
+// QueryRoomVersionForRoomResponse is a response to QueryRoomVersionForRoomRequest
type QueryRoomVersionForRoomResponse struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
}
@@ -350,12 +338,6 @@ type RoomserverQueryAPI interface {
response *QueryBackfillResponse,
) error
- QueryServersInRoomAtEvent(
- ctx context.Context,
- request *QueryServersInRoomAtEventRequest,
- response *QueryServersInRoomAtEventResponse,
- ) error
-
// Asks for the default room version as preferred by the server.
QueryRoomVersionCapabilities(
ctx context.Context,
@@ -401,13 +383,10 @@ const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthC
// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
-// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API
-const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents"
-
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities"
-// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
+// RoomserverQueryRoomVersionForRoomPath is the HTTP path for the QueryRoomVersionForRoom API
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
@@ -555,19 +534,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
-// QueryServersInRoomAtEvent implements RoomServerQueryAPI
-func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
- ctx context.Context,
- request *QueryServersInRoomAtEventRequest,
- response *QueryServersInRoomAtEventResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServersInRoomAtEvent")
- defer span.Finish()
-
- apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath
- return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
-
// QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context,
diff --git a/roomserver/input/events.go b/roomserver/input/events.go
index 205035d9..69828d9f 100644
--- a/roomserver/input/events.go
+++ b/roomserver/input/events.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
@@ -53,6 +54,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil {
+ logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event")
return
}
@@ -77,6 +79,7 @@ func processRoomEvent(
// For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to
// notify anyone about it.
+ logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
return event.EventID(), nil
}
@@ -89,11 +92,6 @@ func processRoomEvent(
}
}
- if input.Kind == api.KindBackfill {
- // Backfill is not implemented.
- panic("Not implemented")
- }
-
// Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
diff --git a/roomserver/query/backfill.go b/roomserver/query/backfill.go
new file mode 100644
index 00000000..09a515e9
--- /dev/null
+++ b/roomserver/query/backfill.go
@@ -0,0 +1,217 @@
+package query
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+)
+
+// backfillRequester implements gomatrixserverlib.BackfillRequester
+type backfillRequester struct {
+ db storage.Database
+ fedClient *gomatrixserverlib.FederationClient
+ thisServer gomatrixserverlib.ServerName
+
+ // per-request state
+ servers []gomatrixserverlib.ServerName
+ eventIDToBeforeStateIDs map[string][]string
+ eventIDMap map[string]gomatrixserverlib.Event
+}
+
+func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester {
+ return &backfillRequester{
+ db: db,
+ fedClient: fedClient,
+ thisServer: thisServer,
+ eventIDToBeforeStateIDs: make(map[string][]string),
+ eventIDMap: make(map[string]gomatrixserverlib.Event),
+ }
+}
+
+func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) {
+ b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap()
+ if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
+ return ids, nil
+ }
+ // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
+ // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
+ // we don't know the result of state res to merge forks (2 or more prev_events)
+ if len(targetEvent.PrevEventIDs()) == 1 {
+ prevEventID := targetEvent.PrevEventIDs()[0]
+ prevEvent, ok := b.eventIDMap[prevEventID]
+ if !ok {
+ goto FederationHit
+ }
+ prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID]
+ if !ok {
+ goto FederationHit
+ }
+ newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs)
+ if newStateIDs != nil {
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs, nil
+ }
+ // else we failed to calculate the new state, so fallthrough
+ }
+
+FederationHit:
+ var lastErr error
+ logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
+ for _, srv := range b.servers { // hit any valid server
+ c := gomatrixserverlib.FederatedStateProvider{
+ FedClient: b.fedClient,
+ AuthEventsOnly: false,
+ Server: srv,
+ }
+ res, err := c.StateIDsBeforeEvent(ctx, targetEvent)
+ if err != nil {
+ lastErr = err
+ continue
+ }
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
+ return res, nil
+ }
+ return nil, lastErr
+}
+
+func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string {
+ newStateIDs := prevEventStateIDs[:]
+ if prevEvent.StateKey() == nil {
+ // state is the same as the previous event
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs
+ }
+
+ missingState := false // true if we are missing the info for a state event ID
+ foundEvent := false // true if we found a (type, state_key) match
+ // find which state ID to replace, if any
+ for i, id := range newStateIDs {
+ ev, ok := b.eventIDMap[id]
+ if !ok {
+ missingState = true
+ continue
+ }
+ // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself
+ if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && *ev.StateKey() == *prevEvent.StateKey() {
+ newStateIDs[i] = prevEvent.EventID()
+ foundEvent = true
+ break
+ }
+ }
+ if !foundEvent && !missingState {
+ // we can be certain that this is new state
+ newStateIDs = append(newStateIDs, prevEvent.EventID())
+ foundEvent = true
+ }
+
+ if foundEvent {
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs
+ }
+ return nil
+}
+
+func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) {
+ // try to fetch the events from the database first
+ events, err := b.ProvideEvents(roomVer, eventIDs)
+ if err != nil {
+ // non-fatal, fallthrough
+ logrus.WithError(err).Info("Failed to fetch events")
+ } else {
+ logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs))
+ if len(events) == len(eventIDs) {
+ result := make(map[string]*gomatrixserverlib.Event)
+ for i := range events {
+ result[events[i].EventID()] = &events[i]
+ b.eventIDMap[events[i].EventID()] = events[i]
+ }
+ return result, nil
+ }
+ }
+
+ c := gomatrixserverlib.FederatedStateProvider{
+ FedClient: b.fedClient,
+ AuthEventsOnly: false,
+ Server: b.servers[0],
+ }
+ result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+ for eventID, ev := range result {
+ b.eventIDMap[eventID] = *ev
+ }
+ return result, nil
+}
+
+// ServersAtEvent is called when trying to determine which server to request from.
+// It returns a list of servers which can be queried for backfill requests. These servers
+// will be servers that are in the room already. The entries at the beginning are preferred servers
+// and will be tried first. An empty list will fail the request.
+func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) {
+ // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
+ // the event is necessary.
+ NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
+ if err != nil {
+ logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
+ return
+ }
+
+ // Retrieve all "m.room.member" state events of "join" membership, which
+ // contains the list of users in the room before the event, therefore all
+ // the servers in it at that moment.
+ events, err := getMembershipsBeforeEventNID(ctx, b.db, NIDs[eventID], true)
+ if err != nil {
+ logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
+ return
+ }
+
+ // Store the server names in a temporary map to avoid duplicates.
+ serverSet := make(map[gomatrixserverlib.ServerName]bool)
+ for _, event := range events {
+ serverSet[event.Origin()] = true
+ }
+ for server := range serverSet {
+ if server == b.thisServer {
+ continue
+ }
+ servers = append(servers, server)
+ }
+ b.servers = servers
+ return
+}
+
+// Backfill performs a backfill request to the given server.
+// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
+func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
+ tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
+ return &tx, err
+}
+
+func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+ ctx := context.Background()
+ nidMap, err := b.db.EventNIDs(ctx, eventIDs)
+ if err != nil {
+ logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events")
+ return nil, err
+ }
+ eventNIDs := make([]types.EventNID, len(nidMap))
+ i := 0
+ for _, nid := range nidMap {
+ eventNIDs[i] = nid
+ i++
+ }
+ eventsWithNids, err := b.db.Events(ctx, eventNIDs)
+ if err != nil {
+ logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
+ return nil, err
+ }
+ events := make([]gomatrixserverlib.Event, len(eventsWithNids))
+ for i := range eventsWithNids {
+ events[i] = eventsWithNids[i].Event
+ }
+ return events, nil
+}
diff --git a/roomserver/query/query.go b/roomserver/query/query.go
index 7508d790..a54fa58d 100644
--- a/roomserver/query/query.go
+++ b/roomserver/query/query.go
@@ -19,6 +19,7 @@ package query
import (
"context"
"encoding/json"
+ "fmt"
"net/http"
"github.com/matrix-org/dendrite/common"
@@ -31,12 +32,16 @@ import (
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
)
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
type RoomserverQueryAPI struct {
DB storage.Database
ImmutableCache caching.ImmutableCache
+ ServerName gomatrixserverlib.ServerName
+ KeyRing gomatrixserverlib.JSONVerifier
+ FedClient *gomatrixserverlib.FederationClient
}
// QueryLatestEventsAndState implements api.RoomserverQueryAPI
@@ -281,7 +286,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
events, err = r.DB.Events(ctx, eventNIDs)
} else {
- events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly)
+ events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly)
}
if err != nil {
@@ -300,19 +305,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
// of the event's room as it was when this event was fired, then filters the state events to
// only keep the "m.room.member" events with a "join" membership. These events are returned.
// Returns an error if there was an issue fetching the events.
-func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
- ctx context.Context, eventNID types.EventNID, joinedOnly bool,
+func getMembershipsBeforeEventNID(
+ ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool,
) ([]types.Event, error) {
- roomState := state.NewStateResolution(r.DB)
+ roomState := state.NewStateResolution(db)
events := []types.Event{}
// Lookup the event NID
- eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID})
+ eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
if err != nil {
return nil, err
}
eventIDs := []string{eIDs[eventNID]}
- prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs)
+ prevState, err := db.StateAtEventIDs(ctx, eventIDs)
if err != nil {
return nil, err
}
@@ -332,7 +337,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
}
// Get all of the events in this state
- stateEvents, err := r.DB.Events(ctx, eventNIDs)
+ stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil {
return nil, err
}
@@ -484,6 +489,13 @@ func (r *RoomserverQueryAPI) QueryBackfill(
request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse,
) error {
+ // if we are requesting the backfill then we need to do a federation hit
+ // TODO: we could be more sensible and fetch as many events we already have then request the rest
+ // which is what the syncapi does already.
+ if request.ServerName == r.ServerName {
+ return r.backfillViaFederation(ctx, request, response)
+ }
+ // someone else is requesting the backfill, try to service their request.
var err error
var front []string
@@ -525,6 +537,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err
}
+func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
+ roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
+ if err != nil {
+ return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
+ }
+ requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
+ events, err := gomatrixserverlib.RequestBackfill(
+ ctx, requester,
+ r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
+ if err != nil {
+ return err
+ }
+ logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
+
+ // persist these new events - auth checks have already been done
+ roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
+ if err != nil {
+ return err
+ }
+
+ for _, ev := range backfilledEventMap {
+ // now add state for these events
+ stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
+ if !ok {
+ // this should be impossible as all events returned must have pass Step 5 of the PDU checks
+ // which requires a list of state IDs.
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks")
+ continue
+ }
+ var entries []types.StateEntry
+ if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
+ return err
+ }
+
+ var beforeStateSnapshotNID types.StateSnapshotNID
+ if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
+ return err
+ }
+ if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event")
+ }
+ }
+
+ // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
+
+ res.Events = events
+ return nil
+}
+
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
roomNID, err := r.DB.RoomNID(ctx, roomID)
if err != nil {
@@ -778,39 +839,33 @@ func getAuthChain(
return authEvents, nil
}
-// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI
-func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent(
- ctx context.Context,
- request *api.QueryServersInRoomAtEventRequest,
- response *api.QueryServersInRoomAtEventResponse,
-) error {
- // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
- // the event is necessary.
- NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID})
- if err != nil {
- return err
- }
-
- // Retrieve all "m.room.member" state events of "join" membership, which
- // contains the list of users in the room before the event, therefore all
- // the servers in it at that moment.
- events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
- if err != nil {
- return err
- }
-
- // Store the server names in a temporary map to avoid duplicates.
- servers := make(map[gomatrixserverlib.ServerName]bool)
- for _, event := range events {
- servers[event.Origin()] = true
- }
-
- // Populate the response.
- for server := range servers {
- response.Servers = append(response.Servers, server)
+func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
+ var roomNID types.RoomNID
+ backfilledEventMap := make(map[string]types.Event)
+ for _, ev := range events {
+ nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
+ if err != nil { // this shouldn't happen as RequestBackfill already found them
+ logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
+ continue
+ }
+ authNids := make([]types.EventNID, len(nidMap))
+ i := 0
+ for _, nid := range nidMap {
+ authNids[i] = nid
+ i++
+ }
+ var stateAtEvent types.StateAtEvent
+ roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
+ if err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
+ continue
+ }
+ backfilledEventMap[ev.EventID()] = types.Event{
+ EventNID: stateAtEvent.StateEntry.EventNID,
+ Event: ev.Unwrap(),
+ }
}
-
- return nil
+ return roomNID, backfilledEventMap
}
// QueryRoomVersionCapabilities implements api.RoomserverQueryAPI
@@ -995,20 +1050,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
}),
)
servMux.Handle(
- api.RoomserverQueryServersInRoomAtEventPath,
- common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
- var request api.QueryServersInRoomAtEventRequest
- var response api.QueryServersInRoomAtEventResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.ErrorResponse(err)
- }
- if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
- servMux.Handle(
api.RoomserverQueryRoomVersionCapabilitiesPath,
common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {
var request api.QueryRoomVersionCapabilitiesRequest
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index fa4f2062..ea1c5c4c 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -18,6 +18,7 @@ import (
"net/http"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent"
@@ -33,7 +34,7 @@ import (
// allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP.
func SetupRoomServerComponent(
- base *basecomponent.BaseDendrite,
+ base *basecomponent.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier,
) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) {
roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer))
if err != nil {
@@ -51,6 +52,11 @@ func SetupRoomServerComponent(
queryAPI := query.RoomserverQueryAPI{
DB: roomserverDB,
ImmutableCache: base.ImmutableCache,
+ ServerName: base.Cfg.Matrix.ServerName,
+ FedClient: base.CreateFederationClient(),
+ // TODO: We should have a key server so we don't keep adding components
+ // which talk to the same DB.
+ KeyRing: keyRing,
}
queryAPI.SetupHTTP(http.DefaultServeMux)
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index a13c44d6..fb39eca6 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -31,7 +31,8 @@ type Database interface {
state []types.StateEntry,
) (types.StateSnapshotNID, error)
// Look up the state of a room at each event for a list of string event IDs.
- // Returns an error if there is an error talking to the database
+ // Returns an error if there is an error talking to the database.
+ // The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types.