diff options
author | Kegsay <kegan@matrix.org> | 2020-06-11 19:50:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-11 19:50:40 +0100 |
commit | ec7718e7f842fa0fc5198489c904de21003db4c2 (patch) | |
tree | e267fe8dae227b274381213ef3e8a3f34fbf0f26 /syncapi | |
parent | 25cd2dd1c925fa0c1eeb27a3cd71e668344102ad (diff) |
Roomserver API changes (#1118)
* s/QueryBackfill/PerformBackfill/g
* OutputEvent now includes AddStateEvents which contain the full event of extra state events
* Only include adds not the current event
* Get adding state right
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 84 | ||||
-rw-r--r-- | syncapi/routing/messages.go | 6 |
2 files changed, 5 insertions, 85 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 055f7660..13597682 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -17,7 +17,6 @@ package consumers import ( "context" "encoding/json" - "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" @@ -105,17 +104,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( "room_version": ev.RoomVersion, }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) - if err != nil { - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": msg.AddsStateEventIDs, - "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: state event lookup failure") - } + addsStateEvents := msg.AddsState() - ev, err = s.updateStateEvent(ev) + ev, err := s.updateStateEvent(ev) if err != nil { return err } @@ -185,63 +176,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( return nil } -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, -) ([]gomatrixserverlib.HeaderedEvent, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - return nil, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.HeaderedEvent{event}, nil - } - - // Check if this is re-adding a state events that we previously processed - // If we have previously received a state event it may still be in - // our event database. - result, err := s.db.Events(context.TODO(), addsStateEventIDs) - if err != nil { - return nil, err - } - missing := missingEventsFrom(result, addsStateEventIDs) - - // Check if event itself is being added. - for _, eventID := range missing { - if eventID == event.EventID() { - result = append(result, event) - break - } - } - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) == 0 { - return result, nil - } - - // At this point the missing events are neither the event itself nor are - // they present in our local database. Our only option is to fetch them - // from the roomserver using the query API. - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - result = append(result, eventResp.Events...) - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) != 0 { - return nil, fmt.Errorf( - "missing %d state events IDs at event %q", len(missing), event.EventID(), - ) - } - - return result, nil -} - func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { var stateKey string if event.StateKey() == nil { @@ -270,17 +204,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade event.Event, err = event.SetUnsigned(prev) return event, err } - -func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string { - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range required { - if !have[eventID] { - missing = append(missing, eventID) - } - } - return missing -} diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 8c897634..de5429db 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -375,15 +375,15 @@ func (e eventsByDepth) Less(i, j int) bool { // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { - var res api.QueryBackfillResponse - err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{ + var res api.PerformBackfillResponse + err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{ RoomID: roomID, BackwardsExtremities: backwardsExtremities, Limit: limit, ServerName: r.cfg.Matrix.ServerName, }, &res) if err != nil { - return nil, fmt.Errorf("QueryBackfill failed: %w", err) + return nil, fmt.Errorf("PerformBackfill failed: %w", err) } util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill") |