aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-05-09 16:19:35 +0100
committerGitHub <noreply@github.com>2022-05-09 16:19:35 +0100
commit79da75d483d3ee554722000975e13776e4e8a656 (patch)
tree41b1ecfebe8147f112fd7f95b785137d87d402cc
parent1a7f4c8aa978d7e2f6046b6628ecf523460eee28 (diff)
Federation consumer `adds_state_event_ids` tweak (#2441)
* Don't ask roomserver for events we already have in federation API * Check number of events returned is as expected * Preallocate array * Improve shape a bit
-rw-r--r--federationapi/consumers/roomserver.go25
-rw-r--r--roomserver/api/output.go13
-rw-r--r--syncapi/consumers/roomserver.go23
3 files changed, 28 insertions, 33 deletions
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index ff2c8e5d..80317ee6 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -146,28 +146,25 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
- eventsRes := &api.QueryEventsByIDResponse{}
- if len(ore.AddsStateEventIDs) > 0 {
+ addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
+
+ // Ask the roomserver and add in the rest of the results into the set.
+ // Finally, work out if there are any more events missing.
+ if len(missingEventIDs) > 0 {
eventsReq := &api.QueryEventsByIDRequest{
- EventIDs: ore.AddsStateEventIDs,
+ EventIDs: missingEventIDs,
}
+ eventsRes := &api.QueryEventsByIDResponse{}
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
}
-
- found := false
- for _, event := range eventsRes.Events {
- if event.EventID() == ore.Event.EventID() {
- found = true
- break
- }
- }
- if !found {
- eventsRes.Events = append(eventsRes.Events, ore.Event)
+ if len(eventsRes.Events) != len(missingEventIDs) {
+ return fmt.Errorf("missing state events")
}
+ addsStateEvents = append(addsStateEvents, eventsRes.Events...)
}
- addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
+ addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
if err != nil {
return err
}
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index 767611ec..a82bf870 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -163,6 +163,19 @@ type OutputNewRoomEvent struct {
TransactionID *TransactionID `json:"transaction_id,omitempty"`
}
+func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
+ addsStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, 1)
+ missingEventIDs := make([]string, 0, len(o.AddsStateEventIDs))
+ for _, eventID := range o.AddsStateEventIDs {
+ if eventID != o.Event.EventID() {
+ missingEventIDs = append(missingEventIDs, eventID)
+ } else {
+ addsStateEvents = append(addsStateEvents, o.Event)
+ }
+ }
+ return addsStateEvents, missingEventIDs
+}
+
// An OutputOldRoomEvent is written when the roomserver receives an old event.
// This will typically happen as a result of getting either missing events
// or backfilling. Downstream components may wish to send these events to
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index e1c2ea82..63bde816 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -154,35 +154,20 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
- addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
+ addsStateEvents, missingEventIDs := msg.NeededStateEventIDs()
// Work out the list of events we need to find out about. Either
// they will be the event supplied in the request, we will find it
// in the sync API database or we'll need to ask the roomserver.
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
- for _, eventID := range msg.AddsStateEventIDs {
- if eventID == ev.EventID() {
- knownEventIDs[eventID] = true
- addsStateEvents = append(addsStateEvents, ev)
- } else {
- knownEventIDs[eventID] = false
- }
- }
-
- // Work out which events we want to look up in the sync API database.
- // At this stage the only event that should be excluded is the event
- // supplied in the request, if it appears in the adds_state_event_ids.
- missingEventIDs := make([]string, 0, len(msg.AddsStateEventIDs))
- for eventID, known := range knownEventIDs {
- if !known {
- missingEventIDs = append(missingEventIDs, eventID)
- }
+ for _, eventID := range missingEventIDs {
+ knownEventIDs[eventID] = false
}
// Look the events up in the database. If we know them, add them into
// the set of adds state events.
if len(missingEventIDs) > 0 {
- alreadyKnown, err := s.db.Events(ctx, msg.AddsStateEventIDs)
+ alreadyKnown, err := s.db.Events(ctx, missingEventIDs)
if err != nil {
return fmt.Errorf("s.db.Events: %w", err)
}