aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-20 16:04:31 +0100
committerGitHub <noreply@github.com>2020-05-20 16:04:31 +0100
commit1414922026636e71ba61acc5c243e9a6cf914981 (patch)
treea1946c1b3f64da919cb79aa7468ed41e02f52668 /syncapi
parent260e69d138fb82003b0a7e77aeb5cac6b3281777 (diff)
sytest: Make 'Inbound federation can backfill events' pass (#1051)
* sytest: Make 'Inbound federation can backfill events' pass This breaks 'Outbound federation can backfill events' because now we are returning the right number of events, which the previous test was relying on. Previously, /messages was backfilling the membership event, causing the test to pass. Now we are no longer backfilling the membership event due to the change in this commit, causing the test to fail. The test should instead be returning the membership event locally from synacpis database, but it doesn't do it fast enough, resulting in a no-op /sync response with a next_batch=s0_0 which will never pick up the local membership event when it rolls in. The test does attempt to retry, but doesn't take the new next_batch=s1_0 resulting in it missing from the /messages response. * Linting
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/routing/messages.go11
-rw-r--r--syncapi/storage/interface.go5
-rw-r--r--syncapi/storage/postgres/backwards_extremities_table.go13
-rw-r--r--syncapi/storage/shared/syncserver.go3
-rw-r--r--syncapi/storage/sqlite3/backwards_extremities_table.go13
-rw-r--r--syncapi/storage/tables/interface.go4
6 files changed, 26 insertions, 23 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 811188cc..88e16fe5 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -205,6 +205,7 @@ func (r *messagesReq) retrieveEvents() (
}
var events []gomatrixserverlib.HeaderedEvent
+ util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents))
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
@@ -373,13 +374,13 @@ func (e eventsByDepth) Less(i, j int) bool {
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
-func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
+func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
var res api.QueryBackfillResponse
err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
- RoomID: roomID,
- EarliestEventsIDs: fromEventIDs,
- Limit: limit,
- ServerName: r.cfg.Matrix.ServerName,
+ RoomID: roomID,
+ BackwardsExtremities: backwardsExtremities,
+ Limit: limit,
+ ServerName: r.cfg.Matrix.ServerName,
}, &res)
if err != nil {
return nil, fmt.Errorf("QueryBackfill failed: %w", err)
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index eba008b3..121e94c7 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -94,9 +94,8 @@ type Database interface {
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
- // BackwardExtremitiesForRoom returns the event IDs of all of the backward
- // extremities we know of for a given room.
- BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
+ // BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
+ BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error)
// MaxTopologicalPosition returns the highest topological position for a given room.
MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go
index 3dd1b234..fa498a40 100644
--- a/syncapi/storage/postgres/backwards_extremities_table.go
+++ b/syncapi/storage/postgres/backwards_extremities_table.go
@@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" +
" ON CONFLICT DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
- "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
+ "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1"
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
@@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
-) (eventIDs []string, err error) {
+) (bwExtrems map[string][]string, err error) {
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
+ bwExtrems = make(map[string][]string)
for rows.Next() {
var eID string
- if err = rows.Scan(&eID); err != nil {
+ var prevEventID string
+ if err = rows.Scan(&eID, &prevEventID); err != nil {
return
}
-
- eventIDs = append(eventIDs, eID)
+ bwExtrems[eID] = append(bwExtrems[eID], prevEventID)
}
- return eventIDs, rows.Err()
+ return bwExtrems, rows.Err()
}
func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index be1f9c7a..543e5b4a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -310,6 +310,7 @@ func (d *Database) updateRoomState(
}
membership = &value
}
+
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
return err
}
@@ -367,7 +368,7 @@ func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken,
func (d *Database) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
-) (backwardExtremities []string, err error) {
+) (backwardExtremities map[string][]string, err error) {
return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
}
diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go
index a172f6bf..c5d9cae5 100644
--- a/syncapi/storage/sqlite3/backwards_extremities_table.go
+++ b/syncapi/storage/sqlite3/backwards_extremities_table.go
@@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" +
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
- "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
+ "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1"
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
@@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
-) (eventIDs []string, err error) {
+) (bwExtrems map[string][]string, err error) {
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
+ bwExtrems = make(map[string][]string)
for rows.Next() {
var eID string
- if err = rows.Scan(&eID); err != nil {
+ var prevEventID string
+ if err = rows.Scan(&eID, &prevEventID); err != nil {
return
}
-
- eventIDs = append(eventIDs, eID)
+ bwExtrems[eID] = append(bwExtrems[eID], prevEventID)
}
- return eventIDs, rows.Err()
+ return bwExtrems, rows.Err()
}
func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index f31bdc2e..bc3b6941 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -89,8 +89,8 @@ type CurrentRoomState interface {
type BackwardsExtremities interface {
// InsertsBackwardExtremity inserts a new backwards extremity.
InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error)
- // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
- SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error)
+ // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room, as a map of event_id to list of prev_event_ids.
+ SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
}