aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-11 19:50:40 +0100
committerGitHub <noreply@github.com>2020-06-11 19:50:40 +0100
commitec7718e7f842fa0fc5198489c904de21003db4c2 (patch)
treee267fe8dae227b274381213ef3e8a3f34fbf0f26 /syncapi
parent25cd2dd1c925fa0c1eeb27a3cd71e668344102ad (diff)
Roomserver API changes (#1118)
* s/QueryBackfill/PerformBackfill/g * OutputEvent now includes AddStateEvents which contain the full event of extra state events * Only include adds not the current event * Get adding state right
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go84
-rw-r--r--syncapi/routing/messages.go6
2 files changed, 5 insertions, 85 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 055f7660..13597682 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -17,7 +17,6 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
@@ -105,17 +104,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
"room_version": ev.RoomVersion,
}).Info("received event from roomserver")
- addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
- if err != nil {
- log.WithFields(log.Fields{
- "event": string(ev.JSON()),
- log.ErrorKey: err,
- "add": msg.AddsStateEventIDs,
- "del": msg.RemovesStateEventIDs,
- }).Panicf("roomserver output log: state event lookup failure")
- }
+ addsStateEvents := msg.AddsState()
- ev, err = s.updateStateEvent(ev)
+ ev, err := s.updateStateEvent(ev)
if err != nil {
return err
}
@@ -185,63 +176,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
return nil
}
-// lookupStateEvents looks up the state events that are added by a new event.
-func (s *OutputRoomEventConsumer) lookupStateEvents(
- addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
-) ([]gomatrixserverlib.HeaderedEvent, error) {
- // Fast path if there aren't any new state events.
- if len(addsStateEventIDs) == 0 {
- return nil, nil
- }
-
- // Fast path if the only state event added is the event itself.
- if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.HeaderedEvent{event}, nil
- }
-
- // Check if this is re-adding a state events that we previously processed
- // If we have previously received a state event it may still be in
- // our event database.
- result, err := s.db.Events(context.TODO(), addsStateEventIDs)
- if err != nil {
- return nil, err
- }
- missing := missingEventsFrom(result, addsStateEventIDs)
-
- // Check if event itself is being added.
- for _, eventID := range missing {
- if eventID == event.EventID() {
- result = append(result, event)
- break
- }
- }
- missing = missingEventsFrom(result, addsStateEventIDs)
-
- if len(missing) == 0 {
- return result, nil
- }
-
- // At this point the missing events are neither the event itself nor are
- // they present in our local database. Our only option is to fetch them
- // from the roomserver using the query API.
- eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
- var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
- return nil, err
- }
-
- result = append(result, eventResp.Events...)
- missing = missingEventsFrom(result, addsStateEventIDs)
-
- if len(missing) != 0 {
- return nil, fmt.Errorf(
- "missing %d state events IDs at event %q", len(missing), event.EventID(),
- )
- }
-
- return result, nil
-}
-
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string
if event.StateKey() == nil {
@@ -270,17 +204,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade
event.Event, err = event.SetUnsigned(prev)
return event, err
}
-
-func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
- have := map[string]bool{}
- for _, event := range events {
- have[event.EventID()] = true
- }
- var missing []string
- for _, eventID := range required {
- if !have[eventID] {
- missing = append(missing, eventID)
- }
- }
- return missing
-}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 8c897634..de5429db 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -375,15 +375,15 @@ func (e eventsByDepth) Less(i, j int) bool {
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
- var res api.QueryBackfillResponse
- err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
+ var res api.PerformBackfillResponse
+ err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
RoomID: roomID,
BackwardsExtremities: backwardsExtremities,
Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
}, &res)
if err != nil {
- return nil, fmt.Errorf("QueryBackfill failed: %w", err)
+ return nil, fmt.Errorf("PerformBackfill failed: %w", err)
}
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")