aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-01-17 10:08:23 +0100
committerGitHub <noreply@github.com>2023-01-17 10:08:23 +0100
commit0d0280cf5ff71ec975b17d0f6dadcae7e46574b5 (patch)
treea13e07f0f775029c1929792b072841657b3a5bcf /syncapi/storage
parent8582c7520abbfca680da9ba16e40a9a92b9fd21c (diff)
`/sync` performance optimizations (#2927)
Since #2849 there is no limit for the current state we fetch to calculate history visibility. In large rooms this can cause us to fetch thousands of membership events we don't really care about. This now only gets the state event types and senders in our timeline, which should significantly reduce the amount of events we fetch from the database. Also removes `MaxTopologicalPosition`, as it is an unnecessary DB call, given we use the result in `topological_position < $1` calls.
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go19
-rw-r--r--syncapi/storage/shared/storage_sync.go17
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go16
-rw-r--r--syncapi/storage/storage_test.go88
-rw-r--r--syncapi/storage/tables/interface.go2
6 files changed, 86 insertions, 58 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 4e22f8a6..a4ba8232 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -84,8 +84,6 @@ type DatabaseTransaction interface {
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
// BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error)
- // MaxTopologicalPosition returns the highest topological position for a given room.
- MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 6fab900e..d0e99f26 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -65,14 +65,6 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
- // Select the max topological position for the room, then sort by stream position and take the highest,
- // returning both topological and stream positions.
-const selectMaxPositionInTopologySQL = "" +
- "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
- " WHERE topological_position=(" +
- "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
- ") ORDER BY stream_position DESC LIMIT 1"
-
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
@@ -84,7 +76,6 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
- selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
@@ -107,9 +98,6 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
- if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
- return nil, err
- }
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
@@ -189,10 +177,3 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
-
-func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
- ctx context.Context, txn *sql.Tx, roomID string,
-) (pos types.StreamPosition, spos types.StreamPosition, err error) {
- err = sqlutil.TxStmt(txn, s.selectMaxPositionInTopologyStmt).QueryRowContext(ctx, roomID).Scan(&pos, &spos)
- return
-}
diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go
index c6933486..7b07cac5 100644
--- a/syncapi/storage/shared/storage_sync.go
+++ b/syncapi/storage/shared/storage_sync.go
@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
+ "math"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
@@ -269,16 +270,6 @@ func (d *DatabaseTransaction) BackwardExtremitiesForRoom(
return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, d.txn, roomID)
}
-func (d *DatabaseTransaction) MaxTopologicalPosition(
- ctx context.Context, roomID string,
-) (types.TopologyToken, error) {
- depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, d.txn, roomID)
- if err != nil {
- return types.TopologyToken{}, err
- }
- return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil
-}
-
func (d *DatabaseTransaction) EventPositionInTopology(
ctx context.Context, eventID string,
) (types.TopologyToken, error) {
@@ -297,11 +288,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
case err == sql.ErrNoRows && backwardOrdering: // no events in range, going backward
return types.TopologyToken{PDUPosition: streamPos}, nil
case err == sql.ErrNoRows && !backwardOrdering: // no events in range, going forward
- topoPos, streamPos, err = d.Topology.SelectMaxPositionInTopology(ctx, d.txn, roomID)
- if err != nil {
- return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectMaxPositionInTopology: %w", err)
- }
- return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil
+ return types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}, nil
case err != nil: // some other error happened
return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectStreamToTopologicalPosition: %w", err)
default:
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index 81b26498..87945644 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -61,10 +61,6 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
-const selectMaxPositionInTopologySQL = "" +
- "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1 ORDER BY stream_position DESC"
-
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
@@ -77,7 +73,6 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
- selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
@@ -102,9 +97,6 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
- if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
- return nil, err
- }
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
@@ -182,11 +174,3 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
-
-func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
- ctx context.Context, txn *sql.Tx, roomID string,
-) (pos types.StreamPosition, spos types.StreamPosition, err error) {
- stmt := sqlutil.TxStmt(txn, s.selectMaxPositionInTopologyStmt)
- err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
- return
-}
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 166ddd23..e65367d8 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "math"
"reflect"
"testing"
@@ -199,10 +200,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
_ = MustWriteEvents(t, db, events)
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
- from, err := snapshot.MaxTopologicalPosition(ctx, r.ID)
- if err != nil {
- t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
- }
+ from := types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}
t.Logf("max topo pos = %+v", from)
// head towards the beginning of time
to := types.TopologyToken{}
@@ -219,6 +217,88 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
})
}
+func TestStreamToTopologicalPosition(t *testing.T) {
+ alice := test.NewUser(t)
+ r := test.NewRoom(t, alice)
+
+ testCases := []struct {
+ name string
+ roomID string
+ streamPos types.StreamPosition
+ backwardOrdering bool
+ wantToken types.TopologyToken
+ }{
+ {
+ name: "forward ordering found streamPos returns found position",
+ roomID: r.ID,
+ streamPos: 1,
+ backwardOrdering: false,
+ wantToken: types.TopologyToken{Depth: 1, PDUPosition: 1},
+ },
+ {
+ name: "forward ordering not found streamPos returns max position",
+ roomID: r.ID,
+ streamPos: 100,
+ backwardOrdering: false,
+ wantToken: types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64},
+ },
+ {
+ name: "backward ordering found streamPos returns found position",
+ roomID: r.ID,
+ streamPos: 1,
+ backwardOrdering: true,
+ wantToken: types.TopologyToken{Depth: 1, PDUPosition: 1},
+ },
+ {
+ name: "backward ordering not found streamPos returns maxDepth with param pduPosition",
+ roomID: r.ID,
+ streamPos: 100,
+ backwardOrdering: true,
+ wantToken: types.TopologyToken{Depth: 5, PDUPosition: 100},
+ },
+ {
+ name: "backward non-existent room returns zero token",
+ roomID: "!doesnotexist:localhost",
+ streamPos: 1,
+ backwardOrdering: true,
+ wantToken: types.TopologyToken{Depth: 0, PDUPosition: 1},
+ },
+ {
+ name: "forward non-existent room returns max token",
+ roomID: "!doesnotexist:localhost",
+ streamPos: 1,
+ backwardOrdering: false,
+ wantToken: types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64},
+ },
+ }
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, close, closeBase := MustCreateDatabase(t, dbType)
+ defer close()
+ defer closeBase()
+
+ txn, err := db.NewDatabaseTransaction(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer txn.Rollback()
+ MustWriteEvents(t, db, r.Events())
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ token, err := txn.StreamToTopologicalPosition(ctx, tc.roomID, tc.streamPos, tc.backwardOrdering)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if tc.wantToken != token {
+ t.Fatalf("expected token %q, got %q", tc.wantToken, token)
+ }
+ })
+ }
+
+ })
+}
+
/*
// The purpose of this test is to make sure that backpagination returns all events, even if some events have the same depth.
// For cases where events have the same depth, the streaming token should be used to tie break so events written via WriteEvent
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index c02e4ecc..8366a67d 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -91,8 +91,6 @@ type Topology interface {
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
// SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
- // SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position.
- SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
}