aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/consumers/roomserver.go51
-rw-r--r--clientapi/consumers/roomserver.go64
-rw-r--r--federationapi/routing/backfill.go8
-rw-r--r--federationapi/routing/send_test.go6
-rw-r--r--federationsender/consumers/roomserver.go6
-rw-r--r--publicroomsapi/consumers/roomserver.go17
-rw-r--r--roomserver/api/api.go6
-rw-r--r--roomserver/api/output.go27
-rw-r--r--roomserver/api/perform.go29
-rw-r--r--roomserver/api/query.go29
-rw-r--r--roomserver/internal/input_latest_events.go66
-rw-r--r--roomserver/internal/perform_backfill.go (renamed from roomserver/internal/query_backfill.go)0
-rw-r--r--roomserver/internal/query.go12
-rw-r--r--roomserver/inthttp/client.go18
-rw-r--r--roomserver/inthttp/server.go10
-rw-r--r--syncapi/consumers/roomserver.go84
-rw-r--r--syncapi/routing/messages.go6
17 files changed, 152 insertions, 287 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index bb4df790..1657fe54 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -91,60 +91,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
- ev := output.NewRoomEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "type": ev.Type(),
- }).Info("appservice received an event from roomserver")
-
- missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
- if err != nil {
- return err
- }
- events := append(missingEvents, ev)
+ events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
+ events = append(events, output.NewRoomEvent.AddStateEvents...)
// Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events)
}
-// lookupMissingStateEvents looks up the state events that are added by a new event,
-// and returns any not already present.
-func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
- addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
-) ([]gomatrixserverlib.HeaderedEvent, error) {
- // Fast path if there aren't any new state events.
- if len(addsStateEventIDs) == 0 {
- return []gomatrixserverlib.HeaderedEvent{}, nil
- }
-
- // Fast path if the only state event added is the event itself.
- if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.HeaderedEvent{}, nil
- }
-
- result := []gomatrixserverlib.HeaderedEvent{}
- missing := []string{}
- for _, id := range addsStateEventIDs {
- if id != event.EventID() {
- // If the event isn't the current one, add it to the list of events
- // to retrieve from the roomserver
- missing = append(missing, id)
- }
- }
-
- // Request the missing events from the roomserver
- eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
- var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
- return nil, err
- }
-
- result = append(result, eventResp.Events...)
-
- return result, nil
-}
-
// filterRoomserverEvents takes in events and decides whether any of them need
// to be passed on to an external application service. It does this by checking
// each namespace of each registered application service, and if there is a
diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go
index bd8ac1dc..caa028ba 100644
--- a/clientapi/consumers/roomserver.go
+++ b/clientapi/consumers/roomserver.go
@@ -84,63 +84,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
- ev := output.NewRoomEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "type": ev.Type(),
- }).Info("received event from roomserver")
-
- events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
- if err != nil {
- return err
- }
-
- return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
-}
-
-// lookupStateEvents looks up the state events that are added by a new event.
-func (s *OutputRoomEventConsumer) lookupStateEvents(
- addsStateEventIDs []string, event gomatrixserverlib.Event,
-) ([]gomatrixserverlib.Event, error) {
- // Fast path if there aren't any new state events.
- if len(addsStateEventIDs) == 0 {
- // If the event is a membership update (e.g. for a profile update), it won't
- // show up in AddsStateEventIDs, so we need to add it manually
- if event.Type() == "m.room.member" {
- return []gomatrixserverlib.Event{event}, nil
- }
- return nil, nil
- }
-
- // Fast path if the only state event added is the event itself.
- if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.Event{event}, nil
- }
-
- result := []gomatrixserverlib.Event{}
- missing := []string{}
- for _, id := range addsStateEventIDs {
- // Append the current event in the results if its ID is in the events list
- if id == event.EventID() {
- result = append(result, event)
- } else {
- // If the event isn't the current one, add it to the list of events
- // to retrieve from the roomserver
- missing = append(missing, id)
- }
- }
-
- // Request the missing events from the roomserver
- eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
- var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
- return nil, err
- }
-
- for _, headeredEvent := range eventResp.Events {
- result = append(result, headeredEvent.Event)
- }
-
- return result, nil
+ return s.db.UpdateMemberships(
+ context.TODO(),
+ gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
+ output.NewRoomEvent.RemovesStateEventIDs,
+ )
}
diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go
index 10bc6263..f906c73c 100644
--- a/federationapi/routing/backfill.go
+++ b/federationapi/routing/backfill.go
@@ -37,7 +37,7 @@ func Backfill(
roomID string,
cfg *config.Dendrite,
) util.JSONResponse {
- var res api.QueryBackfillResponse
+ var res api.PerformBackfillResponse
var eIDs []string
var limit string
var exists bool
@@ -68,7 +68,7 @@ func Backfill(
}
// Populate the request.
- req := api.QueryBackfillRequest{
+ req := api.PerformBackfillRequest{
RoomID: roomID,
// we don't know who the successors are for these events, which won't
// be a problem because we don't use that information when servicing /backfill requests,
@@ -87,8 +87,8 @@ func Backfill(
}
// Query the roomserver.
- if err = rsAPI.QueryBackfill(httpReq.Context(), &req, &res); err != nil {
- util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryBackfill failed")
+ if err = rsAPI.PerformBackfill(httpReq.Context(), &req, &res); err != nil {
+ util.GetLogger(httpReq.Context()).WithError(err).Error("query.PerformBackfill failed")
return jsonerror.InternalServerError()
}
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index 9081a870..adae7c22 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -211,10 +211,10 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain(
}
// Query a given amount (or less) of events prior to a given set of events.
-func (t *testRoomserverAPI) QueryBackfill(
+func (t *testRoomserverAPI) PerformBackfill(
ctx context.Context,
- request *api.QueryBackfillRequest,
- response *api.QueryBackfillResponse,
+ request *api.PerformBackfillRequest,
+ response *api.PerformBackfillResponse,
) error {
return fmt.Errorf("not implemented")
}
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 5f8a555b..a15937f9 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -131,11 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
- addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event)
- if err != nil {
- return err
- }
- addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents)
+ addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
if err != nil {
return err
}
diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go
index c513d3b2..ba187cb1 100644
--- a/publicroomsapi/consumers/roomserver.go
+++ b/publicroomsapi/consumers/roomserver.go
@@ -78,20 +78,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
- ev := output.NewRoomEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "type": ev.Type(),
- }).Info("received event from roomserver")
-
- addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
- var addQueryRes api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
- log.Warn(err)
- return err
- }
-
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
@@ -100,9 +86,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
- for _, headeredEvent := range addQueryRes.Events {
+ for _, headeredEvent := range output.NewRoomEvent.AddsState() {
addQueryEvents = append(addQueryEvents, headeredEvent.Event)
}
+ addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())
for _, headeredEvent := range remQueryRes.Events {
remQueryEvents = append(remQueryEvents, headeredEvent.Event)
}
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index 3a2ad059..967f58ba 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -89,10 +89,10 @@ type RoomserverInternalAPI interface {
) error
// Query a given amount (or less) of events prior to a given set of events.
- QueryBackfill(
+ PerformBackfill(
ctx context.Context,
- request *QueryBackfillRequest,
- response *QueryBackfillResponse,
+ request *PerformBackfillRequest,
+ response *PerformBackfillResponse,
) error
// Asks for the default room version as preferred by the server.
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index 92a468a9..2bbd97af 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -63,6 +63,13 @@ type OutputNewRoomEvent struct {
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
// view of the current state of the room.
AddsStateEventIDs []string `json:"adds_state_event_ids"`
+ // All extra newly added state events. This is only set if there are *extra* events
+ // other than `Event`. This can happen when forks get merged because state resolution
+ // may decide a bunch of state events on one branch are now valid, so they will be
+ // present in this list. This is useful when trying to maintain the current state of a room
+ // as to do so you need to include both these events and `Event`.
+ AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"`
+
// The state event IDs that were removed from the state of the room by this event.
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
// The ID of the event that was output before this event.
@@ -112,6 +119,26 @@ type OutputNewRoomEvent struct {
TransactionID *TransactionID `json:"transaction_id"`
}
+// AddsState returns all added state events from this event.
+//
+// This function is needed because `AddStateEvents` will not include a copy of
+// the original event to save space, so you cannot use that slice alone.
+// Instead, use this function which will add the original event if it is present
+// in `AddsStateEventIDs`.
+func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
+ includeOutputEvent := false
+ for _, id := range ore.AddsStateEventIDs {
+ if id == ore.Event.EventID() {
+ includeOutputEvent = true
+ break
+ }
+ }
+ if !includeOutputEvent {
+ return ore.AddStateEvents
+ }
+ return append(ore.AddStateEvents, ore.Event)
+}
+
// An OutputNewInviteEvent is written whenever an invite becomes active.
// Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves.
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index 1cf54144..3e5cae1b 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -2,6 +2,7 @@ package api
import (
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
)
type PerformJoinRequest struct {
@@ -22,3 +23,31 @@ type PerformLeaveRequest struct {
type PerformLeaveResponse struct {
}
+
+// PerformBackfillRequest is a request to PerformBackfill.
+type PerformBackfillRequest struct {
+ // The room to backfill
+ RoomID string `json:"room_id"`
+ // A map of backwards extremity event ID to a list of its prev_event IDs.
+ BackwardsExtremities map[string][]string `json:"backwards_extremities"`
+ // The maximum number of events to retrieve.
+ Limit int `json:"limit"`
+ // The server interested in the events.
+ ServerName gomatrixserverlib.ServerName `json:"server_name"`
+}
+
+// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
+func (r *PerformBackfillRequest) PrevEventIDs() []string {
+ var prevEventIDs []string
+ for _, pes := range r.BackwardsExtremities {
+ prevEventIDs = append(prevEventIDs, pes...)
+ }
+ prevEventIDs = util.UniqueStrings(prevEventIDs)
+ return prevEventIDs
+}
+
+// PerformBackfillResponse is a response to PerformBackfill.
+type PerformBackfillResponse struct {
+ // Missing events, arbritrary order.
+ Events []gomatrixserverlib.HeaderedEvent `json:"events"`
+}
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index c9a46ae9..b1525342 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -18,7 +18,6 @@ package api
import (
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
)
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
@@ -204,34 +203,6 @@ type QueryStateAndAuthChainResponse struct {
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
}
-// QueryBackfillRequest is a request to QueryBackfill.
-type QueryBackfillRequest struct {
- // The room to backfill
- RoomID string `json:"room_id"`
- // A map of backwards extremity event ID to a list of its prev_event IDs.
- BackwardsExtremities map[string][]string `json:"backwards_extremities"`
- // The maximum number of events to retrieve.
- Limit int `json:"limit"`
- // The server interested in the events.
- ServerName gomatrixserverlib.ServerName `json:"server_name"`
-}
-
-// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
-func (r *QueryBackfillRequest) PrevEventIDs() []string {
- var prevEventIDs []string
- for _, pes := range r.BackwardsExtremities {
- prevEventIDs = append(prevEventIDs, pes...)
- }
- prevEventIDs = util.UniqueStrings(prevEventIDs)
- return prevEventIDs
-}
-
-// QueryBackfillResponse is a response to QueryBackfill.
-type QueryBackfillResponse struct {
- // Missing events, arbritrary order.
- Events []gomatrixserverlib.HeaderedEvent `json:"events"`
-}
-
// QueryRoomVersionCapabilitiesRequest asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{}
diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go
index aea85ca9..e69307ad 100644
--- a/roomserver/internal/input_latest_events.go
+++ b/roomserver/internal/input_latest_events.go
@@ -19,6 +19,7 @@ package internal
import (
"bytes"
"context"
+ "fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
TransactionID: u.transactionID,
}
- var stateEventNIDs []types.EventNID
- for _, entry := range u.added {
- stateEventNIDs = append(stateEventNIDs, entry.EventNID)
- }
- for _, entry := range u.removed {
- stateEventNIDs = append(stateEventNIDs, entry.EventNID)
- }
- for _, entry := range u.stateBeforeEventRemoves {
- stateEventNIDs = append(stateEventNIDs, entry.EventNID)
- }
- for _, entry := range u.stateBeforeEventAdds {
- stateEventNIDs = append(stateEventNIDs, entry.EventNID)
- }
- stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
- eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs)
+ eventIDMap, err := u.stateEventMap()
if err != nil {
return nil, err
}
+
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
@@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}
ore.SendAsServer = u.sendAsServer
+ // include extra state events if they were added as nearly every downstream component will care about it
+ // and we'd rather not have them all hit QueryEventsByID at the same time!
+ if len(ore.AddsStateEventIDs) > 0 {
+ ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
+ }
+ }
+
return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &ore,
}, nil
}
+// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being
+// updated.
+func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
+ var extraEventIDs []string
+ for _, e := range eventIDs {
+ if e == u.event.EventID() {
+ continue
+ }
+ extraEventIDs = append(extraEventIDs, e)
+ }
+ if len(extraEventIDs) == 0 {
+ return nil, nil
+ }
+ extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs)
+ if err != nil {
+ return nil, err
+ }
+ var h []gomatrixserverlib.HeaderedEvent
+ for _, e := range extraEvents {
+ h = append(h, e.Headered(roomVersion))
+ }
+ return h, nil
+}
+
+// retrieve an event nid -> event ID map for all events that need updating
+func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
+ var stateEventNIDs []types.EventNID
+ var allStateEntries []types.StateEntry
+ allStateEntries = append(allStateEntries, u.added...)
+ allStateEntries = append(allStateEntries, u.removed...)
+ allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
+ allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...)
+ for _, entry := range allStateEntries {
+ stateEventNIDs = append(stateEventNIDs, entry.EventNID)
+ }
+ stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
+ return u.api.DB.EventIDs(u.ctx, stateEventNIDs)
+}
+
type eventNIDSorter []types.EventNID
func (s eventNIDSorter) Len() int { return len(s) }
diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/perform_backfill.go
index 23ae9455..23ae9455 100644
--- a/roomserver/internal/query_backfill.go
+++ b/roomserver/internal/perform_backfill.go
diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go
index aea93388..375ddc23 100644
--- a/roomserver/internal/query.go
+++ b/roomserver/internal/query.go
@@ -441,11 +441,11 @@ func (r *RoomserverInternalAPI) QueryMissingEvents(
return err
}
-// QueryBackfill implements api.RoomServerQueryAPI
-func (r *RoomserverInternalAPI) QueryBackfill(
+// PerformBackfill implements api.RoomServerQueryAPI
+func (r *RoomserverInternalAPI) PerformBackfill(
ctx context.Context,
- request *api.QueryBackfillRequest,
- response *api.QueryBackfillResponse,
+ request *api.PerformBackfillRequest,
+ response *api.PerformBackfillResponse,
) error {
// if we are requesting the backfill then we need to do a federation hit
// TODO: we could be more sensible and fetch as many events we already have then request the rest
@@ -489,7 +489,7 @@ func (r *RoomserverInternalAPI) QueryBackfill(
return err
}
-func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
+func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error {
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
@@ -647,7 +647,7 @@ func (r *RoomserverInternalAPI) scanEventTree(
var pre string
// TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
- // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
+ // Currently, callers like PerformBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
// so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
// duplicate events being sent in response to /backfill requests.
initialIgnoreList := make(map[string]bool, len(visited))
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index 6f5e882e..1244300d 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -24,8 +24,9 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations
- RoomserverPerformJoinPath = "/roomserver/performJoin"
- RoomserverPerformLeavePath = "/roomserver/performLeave"
+ RoomserverPerformJoinPath = "/roomserver/performJoin"
+ RoomserverPerformLeavePath = "/roomserver/performLeave"
+ RoomserverPerformBackfillPath = "/roomserver/performBackfill"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@@ -36,7 +37,6 @@ const (
RoomserverQueryServerAllowedToSeeEventPath = "/roomserver/queryServerAllowedToSeeEvent"
RoomserverQueryMissingEventsPath = "/roomserver/queryMissingEvents"
RoomserverQueryStateAndAuthChainPath = "/roomserver/queryStateAndAuthChain"
- RoomserverQueryBackfillPath = "/roomserver/queryBackfill"
RoomserverQueryRoomVersionCapabilitiesPath = "/roomserver/queryRoomVersionCapabilities"
RoomserverQueryRoomVersionForRoomPath = "/roomserver/queryRoomVersionForRoom"
)
@@ -274,16 +274,16 @@ func (h *httpRoomserverInternalAPI) QueryStateAndAuthChain(
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
-// QueryBackfill implements RoomServerQueryAPI
-func (h *httpRoomserverInternalAPI) QueryBackfill(
+// PerformBackfill implements RoomServerQueryAPI
+func (h *httpRoomserverInternalAPI) PerformBackfill(
ctx context.Context,
- request *api.QueryBackfillRequest,
- response *api.QueryBackfillResponse,
+ request *api.PerformBackfillRequest,
+ response *api.PerformBackfillResponse,
) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBackfill")
defer span.Finish()
- apiURL := h.roomserverURL + RoomserverQueryBackfillPath
+ apiURL := h.roomserverURL + RoomserverPerformBackfillPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index 3a13ce37..8ac815f3 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -165,14 +165,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
}),
)
internalAPIMux.Handle(
- RoomserverQueryBackfillPath,
- internal.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse {
- var request api.QueryBackfillRequest
- var response api.QueryBackfillResponse
+ RoomserverPerformBackfillPath,
+ internal.MakeInternalAPI("PerformBackfill", func(req *http.Request) util.JSONResponse {
+ var request api.PerformBackfillRequest
+ var response api.PerformBackfillResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
- if err := r.QueryBackfill(req.Context(), &request, &response); err != nil {
+ if err := r.PerformBackfill(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 055f7660..13597682 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -17,7 +17,6 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
@@ -105,17 +104,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
"room_version": ev.RoomVersion,
}).Info("received event from roomserver")
- addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
- if err != nil {
- log.WithFields(log.Fields{
- "event": string(ev.JSON()),
- log.ErrorKey: err,
- "add": msg.AddsStateEventIDs,
- "del": msg.RemovesStateEventIDs,
- }).Panicf("roomserver output log: state event lookup failure")
- }
+ addsStateEvents := msg.AddsState()
- ev, err = s.updateStateEvent(ev)
+ ev, err := s.updateStateEvent(ev)
if err != nil {
return err
}
@@ -185,63 +176,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
return nil
}
-// lookupStateEvents looks up the state events that are added by a new event.
-func (s *OutputRoomEventConsumer) lookupStateEvents(
- addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
-) ([]gomatrixserverlib.HeaderedEvent, error) {
- // Fast path if there aren't any new state events.
- if len(addsStateEventIDs) == 0 {
- return nil, nil
- }
-
- // Fast path if the only state event added is the event itself.
- if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.HeaderedEvent{event}, nil
- }
-
- // Check if this is re-adding a state events that we previously processed
- // If we have previously received a state event it may still be in
- // our event database.
- result, err := s.db.Events(context.TODO(), addsStateEventIDs)
- if err != nil {
- return nil, err
- }
- missing := missingEventsFrom(result, addsStateEventIDs)
-
- // Check if event itself is being added.
- for _, eventID := range missing {
- if eventID == event.EventID() {
- result = append(result, event)
- break
- }
- }
- missing = missingEventsFrom(result, addsStateEventIDs)
-
- if len(missing) == 0 {
- return result, nil
- }
-
- // At this point the missing events are neither the event itself nor are
- // they present in our local database. Our only option is to fetch them
- // from the roomserver using the query API.
- eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
- var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
- return nil, err
- }
-
- result = append(result, eventResp.Events...)
- missing = missingEventsFrom(result, addsStateEventIDs)
-
- if len(missing) != 0 {
- return nil, fmt.Errorf(
- "missing %d state events IDs at event %q", len(missing), event.EventID(),
- )
- }
-
- return result, nil
-}
-
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string
if event.StateKey() == nil {
@@ -270,17 +204,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade
event.Event, err = event.SetUnsigned(prev)
return event, err
}
-
-func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
- have := map[string]bool{}
- for _, event := range events {
- have[event.EventID()] = true
- }
- var missing []string
- for _, eventID := range required {
- if !have[eventID] {
- missing = append(missing, eventID)
- }
- }
- return missing
-}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 8c897634..de5429db 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -375,15 +375,15 @@ func (e eventsByDepth) Less(i, j int) bool {
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
- var res api.QueryBackfillResponse
- err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
+ var res api.PerformBackfillResponse
+ err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
RoomID: roomID,
BackwardsExtremities: backwardsExtremities,
Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
}, &res)
if err != nil {
- return nil, fmt.Errorf("QueryBackfill failed: %w", err)
+ return nil, fmt.Errorf("PerformBackfill failed: %w", err)
}
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")