aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-14 16:11:37 +0100
committerGitHub <noreply@github.com>2020-05-14 16:11:37 +0100
commit1b34130a5b1a47bc461fd48c8ca731eaab1a529b (patch)
treef4a54a4ced9304e1c652c50d66c451911782214b /syncapi/storage
parent640a0265df922a5aa3731f3bf61fe99c6acd0bff (diff)
Finish merging syncserver.go (#1033)
* Refactor all postgres tables; start work on sqlite * wip sqlite merges; database is locked errors to investigate and failing tests * Revert "wip sqlite merges; database is locked errors to investigate and failing tests" This reverts commit 26cbfc5b75ae2dc4fb31a838b917aa39d758f162. * convert current room state table * port over sqlite topology table * remove a few functions * remove more functions * Share more code * factor out completesync and a bit more * Remove remaining code
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/postgres/backwards_extremities_table.go106
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go42
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go50
-rw-r--r--syncapi/storage/postgres/syncserver.go903
-rw-r--r--syncapi/storage/shared/syncserver.go896
-rw-r--r--syncapi/storage/sqlite3/backwards_extremities_table.go106
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go39
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go32
-rw-r--r--syncapi/storage/sqlite3/syncserver.go1005
-rw-r--r--syncapi/storage/storage.go8
-rw-r--r--syncapi/storage/storage_test.go2
-rw-r--r--syncapi/storage/storage_wasm.go2
-rw-r--r--syncapi/storage/tables/backward_extremities.go175
-rw-r--r--syncapi/storage/tables/interface.go58
14 files changed, 1284 insertions, 2140 deletions
diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go
new file mode 100644
index 00000000..3dd1b234
--- /dev/null
+++ b/syncapi/storage/postgres/backwards_extremities_table.go
@@ -0,0 +1,106 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+)
+
+const backwardExtremitiesSchema = `
+-- Stores output room events received from the roomserver.
+CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
+ -- The 'room_id' key for the event.
+ room_id TEXT NOT NULL,
+ -- The event ID for the last known event. This is the backwards extremity.
+ event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
+ PRIMARY KEY(room_id, event_id, prev_event_id)
+);
+`
+
+const insertBackwardExtremitySQL = "" +
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT DO NOTHING"
+
+const selectBackwardExtremitiesForRoomSQL = "" +
+ "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
+
+const deleteBackwardExtremitySQL = "" +
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
+
+type backwardExtremitiesStatements struct {
+ insertBackwardExtremityStmt *sql.Stmt
+ selectBackwardExtremitiesForRoomStmt *sql.Stmt
+ deleteBackwardExtremityStmt *sql.Stmt
+}
+
+func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
+ s := &backwardExtremitiesStatements{}
+ _, err := db.Exec(backwardExtremitiesSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
+ return nil, err
+ }
+ if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
+ return nil, err
+ }
+ if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
+) (err error) {
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
+ return
+}
+
+func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
+ ctx context.Context, roomID string,
+) (eventIDs []string, err error) {
+ rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
+
+ for rows.Next() {
+ var eID string
+ if err = rows.Scan(&eID); err != nil {
+ return
+ }
+
+ eventIDs = append(eventIDs, eID)
+ }
+
+ return eventIDs, rows.Err()
+}
+
+func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
+) (err error) {
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
+ return
+}
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index ab8f07b2..b6ab7221 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -22,6 +22,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -103,37 +104,38 @@ type currentRoomStateStatements struct {
selectStateEventStmt *sql.Stmt
}
-func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(currentRoomStateSchema)
+func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
+ s := &currentRoomStateStatements{}
+ _, err := db.Exec(currentRoomStateSchema)
if err != nil {
- return
+ return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
- return
+ return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
- return
+ return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
- return
+ return nil, err
}
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
- return
+ return nil, err
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
- return
+ return nil, err
}
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
- return
+ return nil, err
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
- return
+ return nil, err
}
- return
+ return s, nil
}
-// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
-func (s *currentRoomStateStatements) selectJoinedUsers(
+// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
+func (s *currentRoomStateStatements) SelectJoinedUsers(
ctx context.Context,
) (map[string][]string, error) {
rows, err := s.selectJoinedUsersStmt.QueryContext(ctx)
@@ -157,7 +159,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers(
}
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
-func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
+func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
ctx context.Context,
txn *sql.Tx,
userID string,
@@ -181,8 +183,8 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
return result, rows.Err()
}
-// CurrentState returns all the current state events for the given room.
-func (s *currentRoomStateStatements) selectCurrentState(
+// SelectCurrentState returns all the current state events for the given room.
+func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.HeaderedEvent, error) {
@@ -203,7 +205,7 @@ func (s *currentRoomStateStatements) selectCurrentState(
return rowsToEvents(rows)
}
-func (s *currentRoomStateStatements) deleteRoomStateByEventID(
+func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
ctx context.Context, txn *sql.Tx, eventID string,
) error {
stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
@@ -211,7 +213,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
return err
}
-func (s *currentRoomStateStatements) upsertRoomState(
+func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
@@ -245,7 +247,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
return err
}
-func (s *currentRoomStateStatements) selectEventsWithEventIDs(
+func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt)
@@ -274,7 +276,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
return result, rows.Err()
}
-func (s *currentRoomStateStatements) selectStateEvent(
+func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 51cbd50d..48ebcf25 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -84,36 +85,37 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsFromPositionStmt *sql.Stmt
}
-func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(outputRoomEventsTopologySchema)
+func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
+ s := &outputRoomEventsTopologyStatements{}
+ _, err := db.Exec(outputRoomEventsTopologySchema)
if err != nil {
- return
+ return nil, err
}
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
- return
+ return nil, err
}
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
- return
+ return nil, err
}
- return
+ return s, nil
}
-// insertEventInTopology inserts the given event in the room's topology, based
+// InsertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
-func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
- ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
+ ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) {
_, err = s.insertEventInTopologyStmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
@@ -121,11 +123,11 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
return
}
-// selectEventIDsInRange selects the IDs of events which positions are within a
+// SelectEventIDsInRange selects the IDs of events which positions are within a
// given range in a given room's topological order.
// Returns an empty slice if no events match the given range.
-func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
- ctx context.Context, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
+func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
+ ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
limit int, chronologicalOrder bool,
) (eventIDs []string, err error) {
// Decide on the selection's order according to whether chronological order
@@ -159,26 +161,26 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
return eventIDs, rows.Err()
}
-// selectPositionInTopology returns the position of a given event in the
+// SelectPositionInTopology returns the position of a given event in the
// topology of the room it belongs to.
-func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
- ctx context.Context, eventID string,
+func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology(
+ ctx context.Context, txn *sql.Tx, eventID string,
) (pos, spos types.StreamPosition, err error) {
err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos, &spos)
return
}
-func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
- ctx context.Context, roomID string,
+func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
+ ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
-// selectEventIDsFromPosition returns the IDs of all events that have a given
+// SelectEventIDsFromPosition returns the IDs of all events that have a given
// position in the topology of a given room.
-func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition(
- ctx context.Context, roomID string, pos types.StreamPosition,
+func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
+ ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
) (eventIDs []string, err error) {
// Query the event IDs.
rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos)
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 4fa08ce5..d44f8b7e 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -16,47 +16,27 @@
package postgres
import (
- "context"
"database/sql"
- "encoding/json"
- "fmt"
- "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/roomserver/api"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
- "github.com/matrix-org/dendrite/syncapi/storage/tables"
- "github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
-type stateDelta struct {
- roomID string
- stateEvents []gomatrixserverlib.HeaderedEvent
- membership string
- // The PDU stream position of the latest membership event for this user, if applicable.
- // Can be 0 if there is no membership event in this delta.
- membershipPos types.StreamPosition
-}
-
// SyncServerDatasource represents a sync server datasource which manages
// both the database for PDUs and caches for EDUs.
type SyncServerDatasource struct {
shared.Database
db *sql.DB
common.PartitionOffsetStatements
- roomstate currentRoomStateStatements
- topology outputRoomEventsTopologyStatements
- backwardExtremities tables.BackwardsExtremities
}
-// NewSyncServerDatasource creates a new sync server database
-func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProperties) (*SyncServerDatasource, error) {
+// NewDatabase creates a new sync server database
+func NewDatabase(dbDataSourceName string, dbProperties common.DbProperties) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil {
@@ -73,884 +53,31 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp
if err != nil {
return nil, err
}
- if err = d.roomstate.prepare(d.db); err != nil {
+ currState, err := NewPostgresCurrentRoomStateTable(d.db)
+ if err != nil {
return nil, err
}
invites, err := NewPostgresInvitesTable(d.db)
if err != nil {
return nil, err
}
- if err = d.topology.prepare(d.db); err != nil {
+ topology, err := NewPostgresTopologyTable(d.db)
+ if err != nil {
return nil, err
}
- d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.PostgresBackwardsExtremitiesStatements{})
+ backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
- DB: d.db,
- Invites: invites,
- AccountData: accountData,
- OutputEvents: events,
- EDUCache: cache.New(),
+ DB: d.db,
+ Invites: invites,
+ AccountData: accountData,
+ OutputEvents: events,
+ Topology: topology,
+ CurrentRoomState: currState,
+ BackwardExtremities: backwardExtremities,
+ EDUCache: cache.New(),
}
return &d, nil
}
-
-func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
- return d.roomstate.selectJoinedUsers(ctx)
-}
-
-// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
-// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
-// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
-func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
- if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
-
- // Check if we have all of the event's previous events. If an event is
- // missing, add it to the room's backward extremities.
- prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs())
- if err != nil {
- return err
- }
- var found bool
- for _, eID := range ev.PrevEventIDs() {
- found = false
- for _, prevEv := range prevEvents {
- if eID == prevEv.EventID() {
- found = true
- }
- }
-
- // If the event is missing, consider it a backward extremity.
- if !found {
- if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
- return err
- }
- }
- }
-
- return nil
-}
-
-func (d *SyncServerDatasource) WriteEvent(
- ctx context.Context,
- ev *gomatrixserverlib.HeaderedEvent,
- addStateEvents []gomatrixserverlib.HeaderedEvent,
- addStateEventIDs, removeStateEventIDs []string,
- transactionID *api.TransactionID, excludeFromSync bool,
-) (pduPosition types.StreamPosition, returnErr error) {
- returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- var err error
- pos, err := d.Database.OutputEvents.InsertEvent(
- ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
- )
- if err != nil {
- return err
- }
- pduPosition = pos
-
- if err = d.topology.insertEventInTopology(ctx, ev, pos); err != nil {
- return err
- }
-
- if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
- return err
- }
-
- if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
- // Nothing to do, the event may have just been a message event.
- return nil
- }
-
- return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
- })
-
- return pduPosition, returnErr
-}
-
-func (d *SyncServerDatasource) updateRoomState(
- ctx context.Context, txn *sql.Tx,
- removedEventIDs []string,
- addedEvents []gomatrixserverlib.HeaderedEvent,
- pduPosition types.StreamPosition,
-) error {
- // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
- for _, eventID := range removedEventIDs {
- if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil {
- return err
- }
- }
-
- for _, event := range addedEvents {
- if event.StateKey() == nil {
- // ignore non state events
- continue
- }
- var membership *string
- if event.Type() == "m.room.member" {
- value, err := event.Membership()
- if err != nil {
- return err
- }
- membership = &value
- }
- if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func (d *SyncServerDatasource) GetStateEvent(
- ctx context.Context, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
- return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
-}
-
-func (d *SyncServerDatasource) GetStateEventsForRoom(
- ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
-) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
- err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
- return err
- })
- return
-}
-
-func (d *SyncServerDatasource) GetEventsInTopologicalRange(
- ctx context.Context,
- from, to *types.TopologyToken,
- roomID string, limit int,
- backwardOrdering bool,
-) (events []types.StreamEvent, err error) {
- // Determine the backward and forward limit, i.e. the upper and lower
- // limits to the selection in the room's topology, from the direction.
- var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
- if backwardOrdering {
- // Backward ordering is antichronological (latest event to oldest
- // one).
- backwardLimit = to.Depth()
- forwardLimit = from.Depth()
- forwardMicroLimit = from.PDUPosition()
- } else {
- // Forward ordering is chronological (oldest event to latest one).
- backwardLimit = from.Depth()
- forwardLimit = to.Depth()
- }
-
- // Select the event IDs from the defined range.
- var eIDs []string
- eIDs, err = d.topology.selectEventIDsInRange(
- ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
- )
- if err != nil {
- return
- }
-
- // Retrieve the events' contents using their IDs.
- events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
- return
-}
-
-func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) {
- return d.syncPositionTx(ctx, nil)
-}
-
-func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
- ctx context.Context, roomID string,
-) (backwardExtremities []string, err error) {
- return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
-}
-
-func (d *SyncServerDatasource) MaxTopologicalPosition(
- ctx context.Context, roomID string,
-) (depth types.StreamPosition, stream types.StreamPosition, err error) {
- return d.topology.selectMaxPositionInTopology(ctx, roomID)
-}
-
-func (d *SyncServerDatasource) EventsAtTopologicalPosition(
- ctx context.Context, roomID string, pos types.StreamPosition,
-) ([]types.StreamEvent, error) {
- eIDs, err := d.topology.selectEventIDsFromPosition(ctx, roomID, pos)
- if err != nil {
- return nil, err
- }
-
- return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
-}
-
-func (d *SyncServerDatasource) EventPositionInTopology(
- ctx context.Context, eventID string,
-) (depth types.StreamPosition, stream types.StreamPosition, err error) {
- return d.topology.selectPositionInTopology(ctx, eventID)
-}
-
-func (d *SyncServerDatasource) syncPositionTx(
- ctx context.Context, txn *sql.Tx,
-) (sp types.StreamingToken, err error) {
-
- maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn)
- if err != nil {
- return sp, err
- }
- maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn)
- if err != nil {
- return sp, err
- }
- if maxAccountDataID > maxEventID {
- maxEventID = maxAccountDataID
- }
- maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn)
- if err != nil {
- return sp, err
- }
- if maxInviteID > maxEventID {
- maxEventID = maxInviteID
- }
- sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()))
- return
-}
-
-// addPDUDeltaToResponse adds all PDU deltas to a sync response.
-// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
-func (d *SyncServerDatasource) addPDUDeltaToResponse(
- ctx context.Context,
- device authtypes.Device,
- fromPos, toPos types.StreamPosition,
- numRecentEventsPerRoom int,
- wantFullState bool,
- res *types.Response,
-) (joinedRoomIDs []string, err error) {
- txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return nil, err
- }
- var succeeded bool
- defer func() {
- txerr := common.EndTransaction(txn, &succeeded)
- if err == nil && txerr != nil {
- err = txerr
- }
- }()
-
- stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Work out which rooms to return in the response. This is done by getting not only the currently
- // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
- // This works out what the 'state' key should be for each room as well as which membership block
- // to put the room into.
- var deltas []stateDelta
- if !wantFullState {
- deltas, joinedRoomIDs, err = d.getStateDeltas(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
- )
- } else {
- deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
- )
- }
- if err != nil {
- return nil, err
- }
-
- for _, delta := range deltas {
- err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
- if err != nil {
- return nil, err
- }
- }
-
- // TODO: This should be done in getStateDeltas
- if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
- return nil, err
- }
-
- succeeded = true
- return joinedRoomIDs, nil
-}
-
-// addTypingDeltaToResponse adds all typing notifications to a sync response
-// since the specified position.
-func (d *SyncServerDatasource) addTypingDeltaToResponse(
- since types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) error {
- var jr types.JoinResponse
- var ok bool
- var err error
- for _, roomID := range joinedRoomIDs {
- if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter(
- roomID, int64(since.EDUPosition()),
- ); updated {
- ev := gomatrixserverlib.ClientEvent{
- Type: gomatrixserverlib.MTyping,
- }
- ev.Content, err = json.Marshal(map[string]interface{}{
- "user_ids": typingUsers,
- })
- if err != nil {
- return err
- }
-
- if jr, ok = res.Rooms.Join[roomID]; !ok {
- jr = *types.NewJoinResponse()
- }
- jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
- res.Rooms.Join[roomID] = jr
- }
- }
- return nil
-}
-
-// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
-// the positions of that type are not equal in fromPos and toPos.
-func (d *SyncServerDatasource) addEDUDeltaToResponse(
- fromPos, toPos types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) (err error) {
-
- if fromPos.EDUPosition() != toPos.EDUPosition() {
- err = d.addTypingDeltaToResponse(
- fromPos, joinedRoomIDs, res,
- )
- }
-
- return
-}
-
-func (d *SyncServerDatasource) IncrementalSync(
- ctx context.Context,
- device authtypes.Device,
- fromPos, toPos types.StreamingToken,
- numRecentEventsPerRoom int,
- wantFullState bool,
-) (*types.Response, error) {
- nextBatchPos := fromPos.WithUpdates(toPos)
- res := types.NewResponse(nextBatchPos)
-
- var joinedRoomIDs []string
- var err error
- if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
- joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
- )
- } else {
- joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
- ctx, nil, device.UserID, gomatrixserverlib.Join,
- )
- }
- if err != nil {
- return nil, err
- }
-
- err = d.addEDUDeltaToResponse(
- fromPos, toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
-// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
-func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
- ctx context.Context,
- userID string,
- numRecentEventsPerRoom int,
-) (
- res *types.Response,
- toPos types.StreamingToken,
- joinedRoomIDs []string,
- err error,
-) {
- // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
- // a consistent view of the database throughout. This includes extracting the sync position.
- // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
- // but it's better to not hide the fact that this is being done in a transaction.
- txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return
- }
- var succeeded bool
- defer func() {
- txerr := common.EndTransaction(txn, &succeeded)
- if err == nil && txerr != nil {
- err = txerr
- }
- }()
-
- // Get the current sync position which we will base the sync response on.
- toPos, err = d.syncPositionTx(ctx, txn)
- if err != nil {
- return
- }
-
- res = types.NewResponse(toPos)
-
- // Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return
- }
-
- stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Build up a /sync response. Add joined rooms.
- for _, roomID := range joinedRoomIDs {
- var stateEvents []gomatrixserverlib.HeaderedEvent
- stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter)
- if err != nil {
- return
- }
- // TODO: When filters are added, we may need to call this multiple times to get enough events.
- // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- var recentStreamEvents []types.StreamEvent
- recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
- numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return
- }
-
- // Retrieve the backward topology position, i.e. the position of the
- // oldest event in the room's topology.
- var prevBatchStr string
- if len(recentStreamEvents) > 0 {
- var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
- if err != nil {
- return
- }
- prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
- prevBatch.Decrement()
- prevBatchStr = prevBatch.String()
- }
-
- // We don't include a device here as we don't need to send down
- // transaction IDs for complete syncs
- recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
- stateEvents = removeDuplicates(stateEvents, recentEvents)
- jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatchStr
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = true
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[roomID] = *jr
- }
-
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
- return
- }
-
- succeeded = true
- return res, toPos, joinedRoomIDs, err
-}
-
-func (d *SyncServerDatasource) CompleteSync(
- ctx context.Context, userID string, numRecentEventsPerRoom int,
-) (*types.Response, error) {
- res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
- ctx, userID, numRecentEventsPerRoom,
- )
- if err != nil {
- return nil, err
- }
-
- // Use a zero value SyncPosition for fromPos so all EDU states are added.
- err = d.addEDUDeltaToResponse(
- types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-var txReadOnlySnapshot = sql.TxOptions{
- // Set the isolation level so that we see a snapshot of the database.
- // In PostgreSQL repeatable read transactions will see a snapshot taken
- // at the first query, and since the transaction is read-only it can't
- // run into any serialisation errors.
- // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
- Isolation: sql.LevelRepeatableRead,
- ReadOnly: true,
-}
-
-func (d *SyncServerDatasource) addInvitesToResponse(
- ctx context.Context, txn *sql.Tx,
- userID string,
- fromPos, toPos types.StreamPosition,
- res *types.Response,
-) error {
- invites, err := d.Database.Invites.SelectInviteEventsInRange(
- ctx, txn, userID, fromPos, toPos,
- )
- if err != nil {
- return err
- }
- for roomID, inviteEvent := range invites {
- ir := types.NewInviteResponse(inviteEvent)
- res.Rooms.Invite[roomID] = *ir
- }
- return nil
-}
-
-// Retrieve the backward topology position, i.e. the position of the
-// oldest event in the room's topology.
-func (d *SyncServerDatasource) getBackwardTopologyPos(
- ctx context.Context,
- events []types.StreamEvent,
-) (pos, spos types.StreamPosition) {
- if len(events) > 0 {
- pos, spos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID())
- }
- if pos-1 <= 0 {
- pos = types.StreamPosition(1)
- } else {
- pos = pos - 1
- }
- return
-}
-
-// addRoomDeltaToResponse adds a room state delta to a sync response
-func (d *SyncServerDatasource) addRoomDeltaToResponse(
- ctx context.Context,
- device *authtypes.Device,
- txn *sql.Tx,
- fromPos, toPos types.StreamPosition,
- delta stateDelta,
- numRecentEventsPerRoom int,
- res *types.Response,
-) error {
- endPos := toPos
- if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
- // make sure we don't leak recent events after the leave event.
- // TODO: History visibility makes this somewhat complex to handle correctly. For example:
- // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
- // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
- // in a single /sync request
- // This is all "okay" assuming history_visibility == "shared" which it is by default.
- endPos = delta.membershipPos
- }
- recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents(
- ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
- numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return err
- }
- recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
- delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
- backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents)
- prevBatch := types.NewTopologyToken(
- backwardTopologyPos, backwardStreamPos,
- )
-
- switch delta.membership {
- case gomatrixserverlib.Join:
- jr := types.NewJoinResponse()
-
- jr.Timeline.PrevBatch = prevBatch.String()
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[delta.roomID] = *jr
- case gomatrixserverlib.Leave:
- fallthrough // transitions to leave are the same as ban
- case gomatrixserverlib.Ban:
- // TODO: recentEvents may contain events that this user is not allowed to see because they are
- // no longer in the room.
- lr := types.NewLeaveResponse()
- lr.Timeline.PrevBatch = prevBatch.String()
- lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Leave[delta.roomID] = *lr
- }
-
- return nil
-}
-
-// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
-// Returns a map of room ID to list of events.
-func (d *SyncServerDatasource) fetchStateEvents(
- ctx context.Context, txn *sql.Tx,
- roomIDToEventIDSet map[string]map[string]bool,
- eventIDToEvent map[string]types.StreamEvent,
-) (map[string][]types.StreamEvent, error) {
- stateBetween := make(map[string][]types.StreamEvent)
- missingEvents := make(map[string][]string)
- for roomID, ids := range roomIDToEventIDSet {
- events := stateBetween[roomID]
- for id, need := range ids {
- if !need {
- continue // deleted state
- }
- e, ok := eventIDToEvent[id]
- if ok {
- events = append(events, e)
- } else {
- m := missingEvents[roomID]
- m = append(m, id)
- missingEvents[roomID] = m
- }
- }
- stateBetween[roomID] = events
- }
-
- if len(missingEvents) > 0 {
- // This happens when add_state_ids has an event ID which is not in the provided range.
- // We need to explicitly fetch them.
- allMissingEventIDs := []string{}
- for _, missingEvIDs := range missingEvents {
- allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
- }
- evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
- if err != nil {
- return nil, err
- }
- // we know we got them all otherwise an error would've been returned, so just loop the events
- for _, ev := range evs {
- roomID := ev.RoomID()
- stateBetween[roomID] = append(stateBetween[roomID], ev)
- }
- }
- return stateBetween, nil
-}
-
-func (d *SyncServerDatasource) fetchMissingStateEvents(
- ctx context.Context, txn *sql.Tx, eventIDs []string,
-) ([]types.StreamEvent, error) {
- // Fetch from the events table first so we pick up the stream ID for the
- // event.
- events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs)
- if err != nil {
- return nil, err
- }
-
- have := map[string]bool{}
- for _, event := range events {
- have[event.EventID()] = true
- }
- var missing []string
- for _, eventID := range eventIDs {
- if !have[eventID] {
- missing = append(missing, eventID)
- }
- }
- if len(missing) == 0 {
- return events, nil
- }
-
- // If they are missing from the events table then they should be state
- // events that we received from outside the main event stream.
- // These should be in the room state table.
- stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing)
-
- if err != nil {
- return nil, err
- }
- if len(stateEvents) != len(missing) {
- return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
- }
- events = append(events, stateEvents...)
- return events, nil
-}
-
-// getStateDeltas returns the state deltas between fromPos and toPos,
-// exclusive of oldPos, inclusive of newPos, for the rooms in which
-// the user has new membership events.
-// A list of joined room IDs is also returned in case the caller needs it.
-func (d *SyncServerDatasource) getStateDeltas(
- ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
- stateFilter *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
- // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
- // - Get membership list changes for this user in this sync response
- // - For each room which has membership list changes:
- // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
- // If it is, then we need to send the full room state down (and 'limited' is always true).
- // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
- // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
- // - Get all CURRENTLY joined rooms, and add them to 'joined' block.
- var deltas []stateDelta
-
- // get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
- if err != nil {
- return nil, nil, err
- }
- state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
- if err != nil {
- return nil, nil, err
- }
-
- for roomID, stateStreamEvents := range state {
- for _, ev := range stateStreamEvents {
- // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
- // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
- // dupe join events will result in the entire room state coming down to the client again. This is added in
- // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
- // the timeline.
- if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
- if membership == gomatrixserverlib.Join {
- // send full room state down instead of a delta
- var s []types.StreamEvent
- s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
- if err != nil {
- return nil, nil, err
- }
- state[roomID] = s
- continue // we'll add this room in when we do joined rooms
- }
-
- deltas = append(deltas, stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
- })
- break
- }
- }
- }
-
- // Add in currently joined rooms
- joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return nil, nil, err
- }
- for _, joinedRoomID := range joinedRoomIDs {
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
- roomID: joinedRoomID,
- })
- }
-
- return deltas, joinedRoomIDs, nil
-}
-
-// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
-// requests with full_state=true.
-// Fetches full state for all joined rooms and uses selectStateInRange to get
-// updates for other rooms.
-func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
- ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
- stateFilter *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
- joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return nil, nil, err
- }
-
- // Use a reasonable initial capacity
- deltas := make([]stateDelta, 0, len(joinedRoomIDs))
-
- // Add full states for all joined rooms
- for _, joinedRoomID := range joinedRoomIDs {
- s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
- if stateErr != nil {
- return nil, nil, stateErr
- }
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, s),
- roomID: joinedRoomID,
- })
- }
-
- // Get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
- if err != nil {
- return nil, nil, err
- }
- state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
- if err != nil {
- return nil, nil, err
- }
-
- for roomID, stateStreamEvents := range state {
- for _, ev := range stateStreamEvents {
- if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
- if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
- deltas = append(deltas, stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
- })
- }
-
- break
- }
- }
- }
-
- return deltas, joinedRoomIDs, nil
-}
-
-func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
- ctx context.Context, txn *sql.Tx, roomID string,
- stateFilter *gomatrixserverlib.StateFilter,
-) ([]types.StreamEvent, error) {
- allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
- if err != nil {
- return nil, err
- }
- s := make([]types.StreamEvent, len(allState))
- for i := 0; i < len(s); i++ {
- s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
- }
- return s, nil
-}
-
-// There may be some overlap where events in stateEvents are already in recentEvents, so filter
-// them out so we don't include them twice in the /sync response. They should be in recentEvents
-// only, so clients get to the correct state once they have rolled forward.
-func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
- for _, recentEv := range recentEvents {
- if recentEv.StateKey() == nil {
- continue // not a state event
- }
- // TODO: This is a linear scan over all the current state events in this room. This will
- // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
- // then do a binary search to find matching events, similar to what roomserver does.
- for j := 0; j < len(stateEvents); j++ {
- if stateEvents[j].EventID() == recentEv.EventID() {
- // overwrite the element to remove with the last element then pop the last element.
- // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
- // (we don't care about the order of stateEvents)
- stateEvents[j] = stateEvents[len(stateEvents)-1]
- stateEvents = stateEvents[:len(stateEvents)-1]
- break // there shouldn't be multiple events with the same event ID
- }
- }
- }
- return stateEvents
-}
-
-// getMembershipFromEvent returns the value of content.membership iff the event is a state event
-// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
-func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
- if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
- membership, err := ev.Membership()
- if err != nil {
- return ""
- }
- return membership
- }
- return ""
-}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index e9fed758..96e9ff61 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -3,11 +3,14 @@ package shared
import (
"context"
"database/sql"
+ "encoding/json"
+ "fmt"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -17,11 +20,14 @@ import (
// Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite
// For now this contains the shared functions
type Database struct {
- DB *sql.DB
- Invites tables.Invites
- AccountData tables.AccountData
- OutputEvents tables.Events
- EDUCache *cache.EDUCache
+ DB *sql.DB
+ Invites tables.Invites
+ AccountData tables.AccountData
+ OutputEvents tables.Events
+ Topology tables.Topology
+ CurrentRoomState tables.CurrentRoomState
+ BackwardExtremities tables.BackwardsExtremities
+ EDUCache *cache.EDUCache
}
// Events lookups a list of event by their event ID.
@@ -82,6 +88,26 @@ func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.EDUCache.SetTimeoutCallback(fn)
}
+func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
+ return d.CurrentRoomState.SelectJoinedUsers(ctx)
+}
+
+func (d *Database) GetStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.HeaderedEvent, error) {
+ return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey)
+}
+
+func (d *Database) GetStateEventsForRoom(
+ ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
+) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
+ err = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
+ return err
+ })
+ return
+}
+
func (d *Database) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
var maxID int64
var err error
@@ -182,3 +208,863 @@ func (d *Database) StreamEventsToEvents(device *authtypes.Device, in []types.Str
}
return out
}
+
+// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
+// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
+// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
+func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
+ if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
+ return err
+ }
+
+ // Check if we have all of the event's previous events. If an event is
+ // missing, add it to the room's backward extremities.
+ prevEvents, err := d.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs())
+ if err != nil {
+ return err
+ }
+ var found bool
+ for _, eID := range ev.PrevEventIDs() {
+ found = false
+ for _, prevEv := range prevEvents {
+ if eID == prevEv.EventID() {
+ found = true
+ }
+ }
+
+ // If the event is missing, consider it a backward extremity.
+ if !found {
+ if err = d.BackwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (d *Database) WriteEvent(
+ ctx context.Context,
+ ev *gomatrixserverlib.HeaderedEvent,
+ addStateEvents []gomatrixserverlib.HeaderedEvent,
+ addStateEventIDs, removeStateEventIDs []string,
+ transactionID *api.TransactionID, excludeFromSync bool,
+) (pduPosition types.StreamPosition, returnErr error) {
+ returnErr = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ var err error
+ pos, err := d.OutputEvents.InsertEvent(
+ ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
+ )
+ if err != nil {
+ return err
+ }
+ pduPosition = pos
+
+ if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
+ return err
+ }
+
+ if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
+ return err
+ }
+
+ if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
+ // Nothing to do, the event may have just been a message event.
+ return nil
+ }
+
+ return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
+ })
+
+ return pduPosition, returnErr
+}
+
+func (d *Database) updateRoomState(
+ ctx context.Context, txn *sql.Tx,
+ removedEventIDs []string,
+ addedEvents []gomatrixserverlib.HeaderedEvent,
+ pduPosition types.StreamPosition,
+) error {
+ // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
+ for _, eventID := range removedEventIDs {
+ if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
+ return err
+ }
+ }
+
+ for _, event := range addedEvents {
+ if event.StateKey() == nil {
+ // ignore non state events
+ continue
+ }
+ var membership *string
+ if event.Type() == "m.room.member" {
+ value, err := event.Membership()
+ if err != nil {
+ return err
+ }
+ membership = &value
+ }
+ if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (d *Database) GetEventsInTopologicalRange(
+ ctx context.Context,
+ from, to *types.TopologyToken,
+ roomID string, limit int,
+ backwardOrdering bool,
+) (events []types.StreamEvent, err error) {
+ // Determine the backward and forward limit, i.e. the upper and lower
+ // limits to the selection in the room's topology, from the direction.
+ var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
+ if backwardOrdering {
+ // Backward ordering is antichronological (latest event to oldest
+ // one).
+ backwardLimit = to.Depth()
+ forwardLimit = from.Depth()
+ forwardMicroLimit = from.PDUPosition()
+ } else {
+ // Forward ordering is chronological (oldest event to latest one).
+ backwardLimit = from.Depth()
+ forwardLimit = to.Depth()
+ }
+
+ // Select the event IDs from the defined range.
+ var eIDs []string
+ eIDs, err = d.Topology.SelectEventIDsInRange(
+ ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
+ )
+ if err != nil {
+ return
+ }
+
+ // Retrieve the events' contents using their IDs.
+ events, err = d.OutputEvents.SelectEvents(ctx, nil, eIDs)
+ return
+}
+
+func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
+ err = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ pos, err := d.SyncPositionTx(ctx, txn)
+ if err != nil {
+ return err
+ }
+ tok = pos
+ return nil
+ })
+ return
+}
+
+func (d *Database) BackwardExtremitiesForRoom(
+ ctx context.Context, roomID string,
+) (backwardExtremities []string, err error) {
+ return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
+}
+
+func (d *Database) MaxTopologicalPosition(
+ ctx context.Context, roomID string,
+) (depth types.StreamPosition, stream types.StreamPosition, err error) {
+ return d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
+}
+
+func (d *Database) EventsAtTopologicalPosition(
+ ctx context.Context, roomID string, pos types.StreamPosition,
+) ([]types.StreamEvent, error) {
+ eIDs, err := d.Topology.SelectEventIDsFromPosition(ctx, nil, roomID, pos)
+ if err != nil {
+ return nil, err
+ }
+
+ return d.OutputEvents.SelectEvents(ctx, nil, eIDs)
+}
+
+func (d *Database) EventPositionInTopology(
+ ctx context.Context, eventID string,
+) (depth types.StreamPosition, stream types.StreamPosition, err error) {
+ return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
+}
+
+// TODO FIXME TEMPORARY PUBLIC
+func (d *Database) SyncPositionTx(
+ ctx context.Context, txn *sql.Tx,
+) (sp types.StreamingToken, err error) {
+
+ maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ maxAccountDataID, err := d.AccountData.SelectMaxAccountDataID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ if maxAccountDataID > maxEventID {
+ maxEventID = maxAccountDataID
+ }
+ maxInviteID, err := d.Invites.SelectMaxInviteID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ if maxInviteID > maxEventID {
+ maxEventID = maxInviteID
+ }
+ sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()))
+ return
+}
+
+// addPDUDeltaToResponse adds all PDU deltas to a sync response.
+// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
+func (d *Database) addPDUDeltaToResponse(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos types.StreamPosition,
+ numRecentEventsPerRoom int,
+ wantFullState bool,
+ res *types.Response,
+) (joinedRoomIDs []string, err error) {
+ txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return nil, err
+ }
+ var succeeded bool
+ defer func() {
+ txerr := common.EndTransaction(txn, &succeeded)
+ if err == nil && txerr != nil {
+ err = txerr
+ }
+ }()
+
+ stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
+
+ // Work out which rooms to return in the response. This is done by getting not only the currently
+ // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
+ // This works out what the 'state' key should be for each room as well as which membership block
+ // to put the room into.
+ var deltas []stateDelta
+ if !wantFullState {
+ deltas, joinedRoomIDs, err = d.getStateDeltas(
+ ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ )
+ } else {
+ deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
+ ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ for _, delta := range deltas {
+ err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // TODO: This should be done in getStateDeltas
+ if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ return nil, err
+ }
+
+ succeeded = true
+ return joinedRoomIDs, nil
+}
+
+// addTypingDeltaToResponse adds all typing notifications to a sync response
+// since the specified position.
+func (d *Database) addTypingDeltaToResponse(
+ since types.StreamingToken,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ var jr types.JoinResponse
+ var ok bool
+ var err error
+ for _, roomID := range joinedRoomIDs {
+ if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
+ roomID, int64(since.EDUPosition()),
+ ); updated {
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MTyping,
+ }
+ ev.Content, err = json.Marshal(map[string]interface{}{
+ "user_ids": typingUsers,
+ })
+ if err != nil {
+ return err
+ }
+
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = *types.NewJoinResponse()
+ }
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+ }
+ return nil
+}
+
+// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
+// the positions of that type are not equal in fromPos and toPos.
+// TODO FIXME TEMPORARY PUBLIC
+func (d *Database) AddEDUDeltaToResponse(
+ fromPos, toPos types.StreamingToken,
+ joinedRoomIDs []string,
+ res *types.Response,
+) (err error) {
+
+ if fromPos.EDUPosition() != toPos.EDUPosition() {
+ err = d.addTypingDeltaToResponse(
+ fromPos, joinedRoomIDs, res,
+ )
+ }
+
+ return
+}
+
+func (d *Database) IncrementalSync(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos types.StreamingToken,
+ numRecentEventsPerRoom int,
+ wantFullState bool,
+) (*types.Response, error) {
+ nextBatchPos := fromPos.WithUpdates(toPos)
+ res := types.NewResponse(nextBatchPos)
+
+ var joinedRoomIDs []string
+ var err error
+ if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
+ joinedRoomIDs, err = d.addPDUDeltaToResponse(
+ ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
+ )
+ } else {
+ joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
+ ctx, nil, device.UserID, gomatrixserverlib.Join,
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ err = d.AddEDUDeltaToResponse(
+ fromPos, toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
+// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
+func (d *Database) getResponseWithPDUsForCompleteSync(
+ ctx context.Context,
+ userID string,
+ numRecentEventsPerRoom int,
+) (
+ res *types.Response,
+ toPos types.StreamingToken,
+ joinedRoomIDs []string,
+ err error,
+) {
+ // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
+ // a consistent view of the database throughout. This includes extracting the sync position.
+ // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
+ // but it's better to not hide the fact that this is being done in a transaction.
+ txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return
+ }
+ var succeeded bool
+ defer func() {
+ txerr := common.EndTransaction(txn, &succeeded)
+ if err == nil && txerr != nil {
+ err = txerr
+ }
+ }()
+
+ // Get the current sync position which we will base the sync response on.
+ toPos, err = d.SyncPositionTx(ctx, txn)
+ if err != nil {
+ return
+ }
+
+ res = types.NewResponse(toPos)
+
+ // Extract room state and recent events for all rooms the user is joined to.
+ joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return
+ }
+
+ stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
+
+ // Build up a /sync response. Add joined rooms.
+ for _, roomID := range joinedRoomIDs {
+ var stateEvents []gomatrixserverlib.HeaderedEvent
+ stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilter)
+ if err != nil {
+ return
+ }
+ // TODO: When filters are added, we may need to call this multiple times to get enough events.
+ // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
+ var recentStreamEvents []types.StreamEvent
+ recentStreamEvents, err = d.OutputEvents.SelectRecentEvents(
+ ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
+ numRecentEventsPerRoom, true, true,
+ )
+ if err != nil {
+ return
+ }
+
+ // Retrieve the backward topology position, i.e. the position of the
+ // oldest event in the room's topology.
+ var prevBatchStr string
+ if len(recentStreamEvents) > 0 {
+ var backwardTopologyPos, backwardStreamPos types.StreamPosition
+ backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
+ if err != nil {
+ return
+ }
+ prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
+ prevBatch.Decrement()
+ prevBatchStr = prevBatch.String()
+ }
+
+ // We don't include a device here as we don't need to send down
+ // transaction IDs for complete syncs
+ recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
+ stateEvents = removeDuplicates(stateEvents, recentEvents)
+ jr := types.NewJoinResponse()
+ jr.Timeline.PrevBatch = prevBatchStr
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = true
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[roomID] = *jr
+ }
+
+ if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
+ return
+ }
+
+ succeeded = true
+ return res, toPos, joinedRoomIDs, err
+}
+
+func (d *Database) CompleteSync(
+ ctx context.Context, userID string, numRecentEventsPerRoom int,
+) (*types.Response, error) {
+ res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
+ ctx, userID, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Use a zero value SyncPosition for fromPos so all EDU states are added.
+ err = d.AddEDUDeltaToResponse(
+ types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+var txReadOnlySnapshot = sql.TxOptions{
+ // Set the isolation level so that we see a snapshot of the database.
+ // In PostgreSQL repeatable read transactions will see a snapshot taken
+ // at the first query, and since the transaction is read-only it can't
+ // run into any serialisation errors.
+ // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
+ Isolation: sql.LevelRepeatableRead,
+ ReadOnly: true,
+}
+
+// TODO FIXME temporary public
+func (d *Database) AddInvitesToResponse(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+ fromPos, toPos types.StreamPosition,
+ res *types.Response,
+) error {
+ invites, err := d.Invites.SelectInviteEventsInRange(
+ ctx, txn, userID, fromPos, toPos,
+ )
+ if err != nil {
+ return err
+ }
+ for roomID, inviteEvent := range invites {
+ ir := types.NewInviteResponse(inviteEvent)
+ res.Rooms.Invite[roomID] = *ir
+ }
+ return nil
+}
+
+// Retrieve the backward topology position, i.e. the position of the
+// oldest event in the room's topology.
+func (d *Database) getBackwardTopologyPos(
+ ctx context.Context, txn *sql.Tx,
+ events []types.StreamEvent,
+) (types.TopologyToken, error) {
+ zeroToken := types.NewTopologyToken(0, 0)
+ if len(events) == 0 {
+ return zeroToken, nil
+ }
+ pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
+ if err != nil {
+ return zeroToken, err
+ }
+ tok := types.NewTopologyToken(pos, spos)
+ tok.Decrement()
+ return tok, nil
+}
+
+// addRoomDeltaToResponse adds a room state delta to a sync response
+func (d *Database) addRoomDeltaToResponse(
+ ctx context.Context,
+ device *authtypes.Device,
+ txn *sql.Tx,
+ fromPos, toPos types.StreamPosition,
+ delta stateDelta,
+ numRecentEventsPerRoom int,
+ res *types.Response,
+) error {
+ endPos := toPos
+ if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
+ // make sure we don't leak recent events after the leave event.
+ // TODO: History visibility makes this somewhat complex to handle correctly. For example:
+ // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
+ // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
+ // in a single /sync request
+ // This is all "okay" assuming history_visibility == "shared" which it is by default.
+ endPos = delta.membershipPos
+ }
+ recentStreamEvents, err := d.OutputEvents.SelectRecentEvents(
+ ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
+ numRecentEventsPerRoom, true, true,
+ )
+ if err != nil {
+ return err
+ }
+ recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
+ delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
+ prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
+ if err != nil {
+ return err
+ }
+
+ switch delta.membership {
+ case gomatrixserverlib.Join:
+ jr := types.NewJoinResponse()
+
+ jr.Timeline.PrevBatch = prevBatch.String()
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[delta.roomID] = *jr
+ case gomatrixserverlib.Leave:
+ fallthrough // transitions to leave are the same as ban
+ case gomatrixserverlib.Ban:
+ // TODO: recentEvents may contain events that this user is not allowed to see because they are
+ // no longer in the room.
+ lr := types.NewLeaveResponse()
+ lr.Timeline.PrevBatch = prevBatch.String()
+ lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Leave[delta.roomID] = *lr
+ }
+
+ return nil
+}
+
+// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
+// Returns a map of room ID to list of events.
+func (d *Database) fetchStateEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomIDToEventIDSet map[string]map[string]bool,
+ eventIDToEvent map[string]types.StreamEvent,
+) (map[string][]types.StreamEvent, error) {
+ stateBetween := make(map[string][]types.StreamEvent)
+ missingEvents := make(map[string][]string)
+ for roomID, ids := range roomIDToEventIDSet {
+ events := stateBetween[roomID]
+ for id, need := range ids {
+ if !need {
+ continue // deleted state
+ }
+ e, ok := eventIDToEvent[id]
+ if ok {
+ events = append(events, e)
+ } else {
+ m := missingEvents[roomID]
+ m = append(m, id)
+ missingEvents[roomID] = m
+ }
+ }
+ stateBetween[roomID] = events
+ }
+
+ if len(missingEvents) > 0 {
+ // This happens when add_state_ids has an event ID which is not in the provided range.
+ // We need to explicitly fetch them.
+ allMissingEventIDs := []string{}
+ for _, missingEvIDs := range missingEvents {
+ allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
+ }
+ evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
+ if err != nil {
+ return nil, err
+ }
+ // we know we got them all otherwise an error would've been returned, so just loop the events
+ for _, ev := range evs {
+ roomID := ev.RoomID()
+ stateBetween[roomID] = append(stateBetween[roomID], ev)
+ }
+ }
+ return stateBetween, nil
+}
+
+func (d *Database) fetchMissingStateEvents(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]types.StreamEvent, error) {
+ // Fetch from the events table first so we pick up the stream ID for the
+ // event.
+ events, err := d.OutputEvents.SelectEvents(ctx, txn, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ have := map[string]bool{}
+ for _, event := range events {
+ have[event.EventID()] = true
+ }
+ var missing []string
+ for _, eventID := range eventIDs {
+ if !have[eventID] {
+ missing = append(missing, eventID)
+ }
+ }
+ if len(missing) == 0 {
+ return events, nil
+ }
+
+ // If they are missing from the events table then they should be state
+ // events that we received from outside the main event stream.
+ // These should be in the room state table.
+ stateEvents, err := d.CurrentRoomState.SelectEventsWithEventIDs(ctx, txn, missing)
+
+ if err != nil {
+ return nil, err
+ }
+ if len(stateEvents) != len(missing) {
+ return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
+ }
+ events = append(events, stateEvents...)
+ return events, nil
+}
+
+// getStateDeltas returns the state deltas between fromPos and toPos,
+// exclusive of oldPos, inclusive of newPos, for the rooms in which
+// the user has new membership events.
+// A list of joined room IDs is also returned in case the caller needs it.
+func (d *Database) getStateDeltas(
+ ctx context.Context, device *authtypes.Device, txn *sql.Tx,
+ fromPos, toPos types.StreamPosition, userID string,
+ stateFilter *gomatrixserverlib.StateFilter,
+) ([]stateDelta, []string, error) {
+ // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
+ // - Get membership list changes for this user in this sync response
+ // - For each room which has membership list changes:
+ // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
+ // If it is, then we need to send the full room state down (and 'limited' is always true).
+ // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
+ // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
+ // - Get all CURRENTLY joined rooms, and add them to 'joined' block.
+ var deltas []stateDelta
+
+ // get all the state events ever between these two positions
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ if err != nil {
+ return nil, nil, err
+ }
+ state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ for roomID, stateStreamEvents := range state {
+ for _, ev := range stateStreamEvents {
+ // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
+ // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
+ // dupe join events will result in the entire room state coming down to the client again. This is added in
+ // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
+ // the timeline.
+ if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership == gomatrixserverlib.Join {
+ // send full room state down instead of a delta
+ var s []types.StreamEvent
+ s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
+ if err != nil {
+ return nil, nil, err
+ }
+ state[roomID] = s
+ continue // we'll add this room in when we do joined rooms
+ }
+
+ deltas = append(deltas, stateDelta{
+ membership: membership,
+ membershipPos: ev.StreamPosition,
+ stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
+ roomID: roomID,
+ })
+ break
+ }
+ }
+ }
+
+ // Add in currently joined rooms
+ joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, joinedRoomID := range joinedRoomIDs {
+ deltas = append(deltas, stateDelta{
+ membership: gomatrixserverlib.Join,
+ stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
+ roomID: joinedRoomID,
+ })
+ }
+
+ return deltas, joinedRoomIDs, nil
+}
+
+// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
+// requests with full_state=true.
+// Fetches full state for all joined rooms and uses selectStateInRange to get
+// updates for other rooms.
+func (d *Database) getStateDeltasForFullStateSync(
+ ctx context.Context, device *authtypes.Device, txn *sql.Tx,
+ fromPos, toPos types.StreamPosition, userID string,
+ stateFilter *gomatrixserverlib.StateFilter,
+) ([]stateDelta, []string, error) {
+ joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Use a reasonable initial capacity
+ deltas := make([]stateDelta, 0, len(joinedRoomIDs))
+
+ // Add full states for all joined rooms
+ for _, joinedRoomID := range joinedRoomIDs {
+ s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
+ if stateErr != nil {
+ return nil, nil, stateErr
+ }
+ deltas = append(deltas, stateDelta{
+ membership: gomatrixserverlib.Join,
+ stateEvents: d.StreamEventsToEvents(device, s),
+ roomID: joinedRoomID,
+ })
+ }
+
+ // Get all the state events ever between these two positions
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ if err != nil {
+ return nil, nil, err
+ }
+ state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ for roomID, stateStreamEvents := range state {
+ for _, ev := range stateStreamEvents {
+ if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
+ deltas = append(deltas, stateDelta{
+ membership: membership,
+ membershipPos: ev.StreamPosition,
+ stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
+ roomID: roomID,
+ })
+ }
+
+ break
+ }
+ }
+ }
+
+ return deltas, joinedRoomIDs, nil
+}
+
+func (d *Database) currentStateStreamEventsForRoom(
+ ctx context.Context, txn *sql.Tx, roomID string,
+ stateFilter *gomatrixserverlib.StateFilter,
+) ([]types.StreamEvent, error) {
+ allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
+ if err != nil {
+ return nil, err
+ }
+ s := make([]types.StreamEvent, len(allState))
+ for i := 0; i < len(s); i++ {
+ s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
+ }
+ return s, nil
+}
+
+// There may be some overlap where events in stateEvents are already in recentEvents, so filter
+// them out so we don't include them twice in the /sync response. They should be in recentEvents
+// only, so clients get to the correct state once they have rolled forward.
+func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
+ for _, recentEv := range recentEvents {
+ if recentEv.StateKey() == nil {
+ continue // not a state event
+ }
+ // TODO: This is a linear scan over all the current state events in this room. This will
+ // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
+ // then do a binary search to find matching events, similar to what roomserver does.
+ for j := 0; j < len(stateEvents); j++ {
+ if stateEvents[j].EventID() == recentEv.EventID() {
+ // overwrite the element to remove with the last element then pop the last element.
+ // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
+ // (we don't care about the order of stateEvents)
+ stateEvents[j] = stateEvents[len(stateEvents)-1]
+ stateEvents = stateEvents[:len(stateEvents)-1]
+ break // there shouldn't be multiple events with the same event ID
+ }
+ }
+ }
+ return stateEvents
+}
+
+// getMembershipFromEvent returns the value of content.membership iff the event is a state event
+// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
+func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
+ if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
+ membership, err := ev.Membership()
+ if err != nil {
+ return ""
+ }
+ return membership
+ }
+ return ""
+}
+
+type stateDelta struct {
+ roomID string
+ stateEvents []gomatrixserverlib.HeaderedEvent
+ membership string
+ // The PDU stream position of the latest membership event for this user, if applicable.
+ // Can be 0 if there is no membership event in this delta.
+ membershipPos types.StreamPosition
+}
diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go
new file mode 100644
index 00000000..a172f6bf
--- /dev/null
+++ b/syncapi/storage/sqlite3/backwards_extremities_table.go
@@ -0,0 +1,106 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+)
+
+const backwardExtremitiesSchema = `
+-- Stores output room events received from the roomserver.
+CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
+ -- The 'room_id' key for the event.
+ room_id TEXT NOT NULL,
+ -- The event ID for the last known event. This is the backwards extremity.
+ event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
+ PRIMARY KEY(room_id, event_id, prev_event_id)
+);
+`
+
+const insertBackwardExtremitySQL = "" +
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
+
+const selectBackwardExtremitiesForRoomSQL = "" +
+ "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
+
+const deleteBackwardExtremitySQL = "" +
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
+
+type backwardExtremitiesStatements struct {
+ insertBackwardExtremityStmt *sql.Stmt
+ selectBackwardExtremitiesForRoomStmt *sql.Stmt
+ deleteBackwardExtremityStmt *sql.Stmt
+}
+
+func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
+ s := &backwardExtremitiesStatements{}
+ _, err := db.Exec(backwardExtremitiesSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
+ return nil, err
+ }
+ if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
+ return nil, err
+ }
+ if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
+) (err error) {
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
+ return
+}
+
+func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
+ ctx context.Context, roomID string,
+) (eventIDs []string, err error) {
+ rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
+
+ for rows.Next() {
+ var eID string
+ if err = rows.Scan(&eID); err != nil {
+ return
+ }
+
+ eventIDs = append(eventIDs, eID)
+ }
+
+ return eventIDs, rows.Err()
+}
+
+func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
+) (err error) {
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
+ return
+}
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 9fafdbed..b540fbc6 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -22,6 +22,7 @@ import (
"strings"
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -91,35 +92,37 @@ type currentRoomStateStatements struct {
selectStateEventStmt *sql.Stmt
}
-func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
- s.streamIDStatements = streamID
- _, err = db.Exec(currentRoomStateSchema)
+func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (tables.CurrentRoomState, error) {
+ s := &currentRoomStateStatements{
+ streamIDStatements: streamID,
+ }
+ _, err := db.Exec(currentRoomStateSchema)
if err != nil {
- return
+ return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
- return
+ return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
- return
+ return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
- return
+ return nil, err
}
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
- return
+ return nil, err
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
- return
+ return nil, err
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
- return
+ return nil, err
}
- return
+ return s, nil
}
// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
-func (s *currentRoomStateStatements) selectJoinedUsers(
+func (s *currentRoomStateStatements) SelectJoinedUsers(
ctx context.Context,
) (map[string][]string, error) {
rows, err := s.selectJoinedUsersStmt.QueryContext(ctx)
@@ -143,7 +146,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers(
}
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
-func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
+func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
ctx context.Context,
txn *sql.Tx,
userID string,
@@ -168,7 +171,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
}
// CurrentState returns all the current state events for the given room.
-func (s *currentRoomStateStatements) selectCurrentState(
+func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilterPart *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.HeaderedEvent, error) {
@@ -189,7 +192,7 @@ func (s *currentRoomStateStatements) selectCurrentState(
return rowsToEvents(rows)
}
-func (s *currentRoomStateStatements) deleteRoomStateByEventID(
+func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
ctx context.Context, txn *sql.Tx, eventID string,
) error {
stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
@@ -197,7 +200,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
return err
}
-func (s *currentRoomStateStatements) upsertRoomState(
+func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
@@ -231,7 +234,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
return err
}
-func (s *currentRoomStateStatements) selectEventsWithEventIDs(
+func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]types.StreamEvent, error) {
iEventIDs := make([]interface{}, len(eventIDs))
@@ -264,7 +267,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
return result, nil
}
-func (s *currentRoomStateStatements) selectStateEvent(
+func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index 0d313d7c..4469f5b7 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -19,6 +19,7 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -77,35 +78,36 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsFromPositionStmt *sql.Stmt
}
-func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(outputRoomEventsTopologySchema)
+func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
+ s := &outputRoomEventsTopologyStatements{}
+ _, err := db.Exec(outputRoomEventsTopologySchema)
if err != nil {
- return
+ return nil, err
}
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
- return
+ return nil, err
}
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
- return
+ return nil, err
}
if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
- return
+ return nil, err
}
- return
+ return s, nil
}
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
-func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
+func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) {
stmt := common.TxStmt(txn, s.insertEventInTopologyStmt)
@@ -118,7 +120,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
// selectEventIDsInRange selects the IDs of events which positions are within a
// given range in a given room's topological order.
// Returns an empty slice if no events match the given range.
-func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
+func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
ctx context.Context, txn *sql.Tx, roomID string,
fromPos, toPos, toMicroPos types.StreamPosition,
limit int, chronologicalOrder bool,
@@ -155,7 +157,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
// selectPositionInTopology returns the position of a given event in the
// topology of the room it belongs to.
-func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
+func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {
stmt := common.TxStmt(txn, s.selectPositionInTopologyStmt)
@@ -163,7 +165,7 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
return
}
-func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
+func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {
stmt := common.TxStmt(txn, s.selectMaxPositionInTopologyStmt)
@@ -173,7 +175,7 @@ func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
// selectEventIDsFromPosition returns the IDs of all events that have a given
// position in the topology of a given room.
-func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition(
+func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
) (eventIDs []string, err error) {
// Query the event IDs.
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 0da05eab..e23aeca4 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -16,18 +16,11 @@
package sqlite3
import (
- "context"
"database/sql"
- "encoding/json"
"errors"
- "fmt"
"net/url"
- "github.com/sirupsen/logrus"
-
- "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/roomserver/api"
// Import the sqlite3 package
_ "github.com/mattn/go-sqlite3"
@@ -35,35 +28,20 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
- "github.com/matrix-org/dendrite/syncapi/storage/tables"
- "github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
-type stateDelta struct {
- roomID string
- stateEvents []gomatrixserverlib.HeaderedEvent
- membership string
- // The PDU stream position of the latest membership event for this user, if applicable.
- // Can be 0 if there is no membership event in this delta.
- membershipPos types.StreamPosition
-}
-
// SyncServerDatasource represents a sync server datasource which manages
// both the database for PDUs and caches for EDUs.
type SyncServerDatasource struct {
shared.Database
db *sql.DB
common.PartitionOffsetStatements
- streamID streamIDStatements
- roomstate currentRoomStateStatements
- topology outputRoomEventsTopologyStatements
- backwardExtremities tables.BackwardsExtremities
+ streamID streamIDStatements
}
-// NewSyncServerDatasource creates a new sync server database
+// NewDatabase creates a new sync server database
// nolint: gocyclo
-func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, error) {
+func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) {
var d SyncServerDatasource
uri, err := url.Parse(dataSourceName)
if err != nil {
@@ -101,982 +79,31 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
- if err = d.roomstate.prepare(d.db, &d.streamID); err != nil {
+ roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
+ if err != nil {
return err
}
invites, err := NewSqliteInvitesTable(d.db, &d.streamID)
if err != nil {
return err
}
- if err = d.topology.prepare(d.db); err != nil {
- return err
- }
- d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.SqliteBackwardsExtremitiesStatements{})
+ topology, err := NewSqliteTopologyTable(d.db)
if err != nil {
return err
}
- d.Database = shared.Database{
- DB: d.db,
- Invites: invites,
- AccountData: accountData,
- OutputEvents: events,
- EDUCache: cache.New(),
- }
- return nil
-}
-
-// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
-func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
- return d.roomstate.selectJoinedUsers(ctx)
-}
-
-// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
-// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
-// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
-func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
- if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
-
- // Check if we have all of the event's previous events. If an event is
- // missing, add it to the room's backward extremities.
- prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs())
- if err != nil {
- return err
- }
- var found bool
- for _, eID := range ev.PrevEventIDs() {
- found = false
- for _, prevEv := range prevEvents {
- if eID == prevEv.EventID() {
- found = true
- }
- }
-
- // If the event is missing, consider it a backward extremity.
- if !found {
- if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
- return err
- }
- }
- }
-
- return nil
-}
-
-// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
-// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
-// Returns an error if there was a problem inserting this event.
-func (d *SyncServerDatasource) WriteEvent(
- ctx context.Context,
- ev *gomatrixserverlib.HeaderedEvent,
- addStateEvents []gomatrixserverlib.HeaderedEvent,
- addStateEventIDs, removeStateEventIDs []string,
- transactionID *api.TransactionID, excludeFromSync bool,
-) (pduPosition types.StreamPosition, returnErr error) {
- returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- var err error
- pos, err := d.Database.OutputEvents.InsertEvent(
- ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
- )
- if err != nil {
- return err
- }
- pduPosition = pos
-
- if err = d.topology.insertEventInTopology(ctx, txn, ev, pos); err != nil {
- return err
- }
-
- if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
- return err
- }
-
- if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
- // Nothing to do, the event may have just been a message event.
- return nil
- }
-
- return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
- })
-
- return pduPosition, returnErr
-}
-
-func (d *SyncServerDatasource) updateRoomState(
- ctx context.Context, txn *sql.Tx,
- removedEventIDs []string,
- addedEvents []gomatrixserverlib.HeaderedEvent,
- pduPosition types.StreamPosition,
-) error {
- // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
- for _, eventID := range removedEventIDs {
- if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil {
- return err
- }
- }
-
- for _, event := range addedEvents {
- if event.StateKey() == nil {
- // ignore non state events
- continue
- }
- var membership *string
- if event.Type() == "m.room.member" {
- value, err := event.Membership()
- if err != nil {
- return err
- }
- membership = &value
- }
- if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// SyncPosition returns the latest positions for syncing.
-func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
- err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- pos, err := d.syncPositionTx(ctx, txn)
- if err != nil {
- return err
- }
- tok = *pos
- return nil
- })
- return
-}
-
-// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
-// If no event could be found, returns nil
-// If there was an issue during the retrieval, returns an error
-func (d *SyncServerDatasource) GetStateEvent(
- ctx context.Context, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
- return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
-}
-
-// GetStateEventsForRoom fetches the state events for a given room.
-// Returns an empty slice if no state events could be found for this room.
-// Returns an error if there was an issue with the retrieval.
-func (d *SyncServerDatasource) GetStateEventsForRoom(
- ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter,
-) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
- err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
- return err
- })
- return
-}
-
-// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the
-// given extremities and limit.
-func (d *SyncServerDatasource) GetEventsInTopologicalRange(
- ctx context.Context,
- from, to *types.TopologyToken,
- roomID string, limit int,
- backwardOrdering bool,
-) (events []types.StreamEvent, err error) {
- // TODO: ARGH CONFUSING
- // Determine the backward and forward limit, i.e. the upper and lower
- // limits to the selection in the room's topology, from the direction.
- var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
- if backwardOrdering {
- // Backward ordering is antichronological (latest event to oldest
- // one).
- backwardLimit = to.Depth()
- forwardLimit = from.Depth()
- forwardMicroLimit = from.PDUPosition()
- } else {
- // Forward ordering is chronological (oldest event to latest one).
- backwardLimit = from.Depth()
- forwardLimit = to.Depth()
- }
-
- // Select the event IDs from the defined range.
- var eIDs []string
- eIDs, err = d.topology.selectEventIDsInRange(
- ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
- )
+ bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db)
if err != nil {
- return
- }
-
- // Retrieve the events' contents using their IDs.
- events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
- return
-}
-
-// BackwardExtremitiesForRoom returns the event IDs of all of the backward
-// extremities we know of for a given room.
-func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
- ctx context.Context, roomID string,
-) (backwardExtremities []string, err error) {
- return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
-}
-
-// MaxTopologicalPosition returns the highest topological position for a given
-// room.
-func (d *SyncServerDatasource) MaxTopologicalPosition(
- ctx context.Context, roomID string,
-) (types.StreamPosition, types.StreamPosition, error) {
- return d.topology.selectMaxPositionInTopology(ctx, nil, roomID)
-}
-
-// EventsAtTopologicalPosition returns all of the events matching a given
-// position in the topology of a given room.
-func (d *SyncServerDatasource) EventsAtTopologicalPosition(
- ctx context.Context, roomID string, pos types.StreamPosition,
-) ([]types.StreamEvent, error) {
- eIDs, err := d.topology.selectEventIDsFromPosition(ctx, nil, roomID, pos)
- if err != nil {
- return nil, err
- }
-
- return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
-}
-
-func (d *SyncServerDatasource) EventPositionInTopology(
- ctx context.Context, eventID string,
-) (depth types.StreamPosition, stream types.StreamPosition, err error) {
- return d.topology.selectPositionInTopology(ctx, nil, eventID)
-}
-
-// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
-func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos types.StreamPosition, err error) {
- err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- pos, err = d.syncStreamPositionTx(ctx, txn)
return err
- })
- return
-}
-
-func (d *SyncServerDatasource) syncStreamPositionTx(
- ctx context.Context, txn *sql.Tx,
-) (types.StreamPosition, error) {
- maxID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn)
- if err != nil {
- return 0, err
- }
- maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn)
- if err != nil {
- return 0, err
- }
- if maxAccountDataID > maxID {
- maxID = maxAccountDataID
- }
- maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn)
- if err != nil {
- return 0, err
- }
- if maxInviteID > maxID {
- maxID = maxInviteID
- }
- return types.StreamPosition(maxID), nil
-}
-
-func (d *SyncServerDatasource) syncPositionTx(
- ctx context.Context, txn *sql.Tx,
-) (*types.StreamingToken, error) {
-
- maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn)
- if err != nil {
- return nil, err
- }
- maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn)
- if err != nil {
- return nil, err
- }
- if maxAccountDataID > maxEventID {
- maxEventID = maxAccountDataID
- }
- maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn)
- if err != nil {
- return nil, err
- }
- if maxInviteID > maxEventID {
- maxEventID = maxInviteID
- }
- sp := types.NewStreamToken(
- types.StreamPosition(maxEventID),
- types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()),
- )
- return &sp, nil
-}
-
-// addPDUDeltaToResponse adds all PDU deltas to a sync response.
-// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
-func (d *SyncServerDatasource) addPDUDeltaToResponse(
- ctx context.Context,
- device authtypes.Device,
- fromPos, toPos types.StreamPosition,
- numRecentEventsPerRoom int,
- wantFullState bool,
- res *types.Response,
-) (joinedRoomIDs []string, err error) {
- txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return nil, err
- }
- var succeeded bool
- defer func() {
- txerr := common.EndTransaction(txn, &succeeded)
- if err == nil && txerr != nil {
- err = txerr
- }
- }()
-
- stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Work out which rooms to return in the response. This is done by getting not only the currently
- // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
- // This works out what the 'state' key should be for each room as well as which membership block
- // to put the room into.
- var deltas []stateDelta
- if !wantFullState {
- deltas, joinedRoomIDs, err = d.getStateDeltas(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
- )
- } else {
- deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
- )
- }
- if err != nil {
- return nil, err
- }
-
- for _, delta := range deltas {
- err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
- if err != nil {
- return nil, err
- }
- }
-
- // TODO: This should be done in getStateDeltas
- if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
- return nil, err
- }
-
- succeeded = true
- return joinedRoomIDs, nil
-}
-
-// addTypingDeltaToResponse adds all typing notifications to a sync response
-// since the specified position.
-func (d *SyncServerDatasource) addTypingDeltaToResponse(
- since types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) error {
- var jr types.JoinResponse
- var ok bool
- var err error
- for _, roomID := range joinedRoomIDs {
- if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter(
- roomID, int64(since.EDUPosition()),
- ); updated {
- ev := gomatrixserverlib.ClientEvent{
- Type: gomatrixserverlib.MTyping,
- }
- ev.Content, err = json.Marshal(map[string]interface{}{
- "user_ids": typingUsers,
- })
- if err != nil {
- return err
- }
-
- if jr, ok = res.Rooms.Join[roomID]; !ok {
- jr = *types.NewJoinResponse()
- }
- jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
- res.Rooms.Join[roomID] = jr
- }
- }
- return nil
-}
-
-// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
-// the positions of that type are not equal in fromPos and toPos.
-func (d *SyncServerDatasource) addEDUDeltaToResponse(
- fromPos, toPos types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) (err error) {
-
- if fromPos.EDUPosition() != toPos.EDUPosition() {
- err = d.addTypingDeltaToResponse(
- fromPos, joinedRoomIDs, res,
- )
- }
-
- return
-}
-
-// IncrementalSync returns all the data needed in order to create an incremental
-// sync response for the given user. Events returned will include any client
-// transaction IDs associated with the given device. These transaction IDs come
-// from when the device sent the event via an API that included a transaction
-// ID.
-func (d *SyncServerDatasource) IncrementalSync(
- ctx context.Context,
- device authtypes.Device,
- fromPos, toPos types.StreamingToken,
- numRecentEventsPerRoom int,
- wantFullState bool,
-) (*types.Response, error) {
- nextBatchPos := fromPos.WithUpdates(toPos)
- res := types.NewResponse(nextBatchPos)
-
- var joinedRoomIDs []string
- var err error
- fmt.Println("from", fromPos.PDUPosition(), "to", toPos.PDUPosition())
- if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
- joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
- )
- } else {
- joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
- ctx, nil, device.UserID, gomatrixserverlib.Join,
- )
- }
- if err != nil {
- return nil, err
}
-
- err = d.addEDUDeltaToResponse(
- fromPos, toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
-// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
-func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
- ctx context.Context,
- userID string,
- numRecentEventsPerRoom int,
-) (
- res *types.Response,
- toPos *types.StreamingToken,
- joinedRoomIDs []string,
- err error,
-) {
- // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
- // a consistent view of the database throughout. This includes extracting the sync position.
- // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
- // but it's better to not hide the fact that this is being done in a transaction.
- txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return
- }
- var succeeded bool
- defer func() {
- txerr := common.EndTransaction(txn, &succeeded)
- if err == nil && txerr != nil {
- err = txerr
- }
- }()
-
- // Get the current sync position which we will base the sync response on.
- toPos, err = d.syncPositionTx(ctx, txn)
- if err != nil {
- return
- }
-
- res = types.NewResponse(*toPos)
-
- // Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return
- }
-
- stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Build up a /sync response. Add joined rooms.
- for _, roomID := range joinedRoomIDs {
- var stateEvents []gomatrixserverlib.HeaderedEvent
- stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
- if err != nil {
- return
- }
- //fmt.Println("State events:", stateEvents)
- // TODO: When filters are added, we may need to call this multiple times to get enough events.
- // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- var recentStreamEvents []types.StreamEvent
- recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
- numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return
- }
- //fmt.Println("Recent stream events:", recentStreamEvents)
-
- // Retrieve the backward topology position, i.e. the position of the
- // oldest event in the room's topology.
- var prevBatchStr string
- if len(recentStreamEvents) > 0 {
- var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
- if err != nil {
- return
- }
- prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
- prevBatch.Decrement()
- prevBatchStr = prevBatch.String()
- }
-
- // We don't include a device here as we don't need to send down
- // transaction IDs for complete syncs
- recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
- stateEvents = removeDuplicates(stateEvents, recentEvents)
- jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatchStr
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = true
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[roomID] = *jr
- }
-
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
- return
- }
-
- succeeded = true
- return res, toPos, joinedRoomIDs, err
-}
-
-// CompleteSync returns a complete /sync API response for the given user.
-func (d *SyncServerDatasource) CompleteSync(
- ctx context.Context, userID string, numRecentEventsPerRoom int,
-) (*types.Response, error) {
- res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
- ctx, userID, numRecentEventsPerRoom,
- )
- if err != nil {
- return nil, err
- }
-
- // Use a zero value SyncPosition for fromPos so all EDU states are added.
- err = d.addEDUDeltaToResponse(
- types.NewStreamToken(0, 0), *toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-var txReadOnlySnapshot = sql.TxOptions{
- // Set the isolation level so that we see a snapshot of the database.
- // In PostgreSQL repeatable read transactions will see a snapshot taken
- // at the first query, and since the transaction is read-only it can't
- // run into any serialisation errors.
- // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
- Isolation: sql.LevelRepeatableRead,
- ReadOnly: true,
-}
-
-func (d *SyncServerDatasource) addInvitesToResponse(
- ctx context.Context, txn *sql.Tx,
- userID string,
- fromPos, toPos types.StreamPosition,
- res *types.Response,
-) error {
- invites, err := d.Database.Invites.SelectInviteEventsInRange(
- ctx, txn, userID, fromPos, toPos,
- )
- if err != nil {
- return err
- }
- for roomID, inviteEvent := range invites {
- ir := types.NewInviteResponse(inviteEvent)
- res.Rooms.Invite[roomID] = *ir
- }
- return nil
-}
-
-// Retrieve the backward topology position, i.e. the position of the
-// oldest event in the room's topology.
-func (d *SyncServerDatasource) getBackwardTopologyPos(
- ctx context.Context, txn *sql.Tx,
- events []types.StreamEvent,
-) (pos, spos types.StreamPosition) {
- if len(events) > 0 {
- pos, spos, _ = d.topology.selectPositionInTopology(ctx, txn, events[0].EventID())
- }
- // go to the previous position so we don't pull out the same event twice
- // FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules
- if pos-1 <= 0 {
- pos = types.StreamPosition(1)
- } else {
- pos = pos - 1
- spos += 1000 // this has to be bigger than the number of events we backfill per request
- }
- return
-}
-
-// addRoomDeltaToResponse adds a room state delta to a sync response
-func (d *SyncServerDatasource) addRoomDeltaToResponse(
- ctx context.Context,
- device *authtypes.Device,
- txn *sql.Tx,
- fromPos, toPos types.StreamPosition,
- delta stateDelta,
- numRecentEventsPerRoom int,
- res *types.Response,
-) error {
- endPos := toPos
- if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
- // make sure we don't leak recent events after the leave event.
- // TODO: History visibility makes this somewhat complex to handle correctly. For example:
- // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
- // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
- // in a single /sync request
- // This is all "okay" assuming history_visibility == "shared" which it is by default.
- endPos = delta.membershipPos
- }
- recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents(
- ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
- numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return err
- }
- recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
- delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents)
- backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
- prevBatch := types.NewTopologyToken(
- backwardTopologyPos, backwardStreamPos,
- )
-
- switch delta.membership {
- case gomatrixserverlib.Join:
- jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatch.String()
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[delta.roomID] = *jr
- case gomatrixserverlib.Leave:
- fallthrough // transitions to leave are the same as ban
- case gomatrixserverlib.Ban:
- // TODO: recentEvents may contain events that this user is not allowed to see because they are
- // no longer in the room.
- lr := types.NewLeaveResponse()
- lr.Timeline.PrevBatch = prevBatch.String()
- lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Leave[delta.roomID] = *lr
+ d.Database = shared.Database{
+ DB: d.db,
+ Invites: invites,
+ AccountData: accountData,
+ OutputEvents: events,
+ BackwardExtremities: bwExtrem,
+ CurrentRoomState: roomState,
+ Topology: topology,
+ EDUCache: cache.New(),
}
-
return nil
}
-
-// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
-// Returns a map of room ID to list of events.
-func (d *SyncServerDatasource) fetchStateEvents(
- ctx context.Context, txn *sql.Tx,
- roomIDToEventIDSet map[string]map[string]bool,
- eventIDToEvent map[string]types.StreamEvent,
-) (map[string][]types.StreamEvent, error) {
- stateBetween := make(map[string][]types.StreamEvent)
- missingEvents := make(map[string][]string)
- for roomID, ids := range roomIDToEventIDSet {
- events := stateBetween[roomID]
- for id, need := range ids {
- if !need {
- continue // deleted state
- }
- e, ok := eventIDToEvent[id]
- if ok {
- events = append(events, e)
- } else {
- m := missingEvents[roomID]
- m = append(m, id)
- missingEvents[roomID] = m
- }
- }
- stateBetween[roomID] = events
- }
-
- if len(missingEvents) > 0 {
- // This happens when add_state_ids has an event ID which is not in the provided range.
- // We need to explicitly fetch them.
- allMissingEventIDs := []string{}
- for _, missingEvIDs := range missingEvents {
- allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
- }
- evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
- if err != nil {
- return nil, err
- }
- // we know we got them all otherwise an error would've been returned, so just loop the events
- for _, ev := range evs {
- roomID := ev.RoomID()
- stateBetween[roomID] = append(stateBetween[roomID], ev)
- }
- }
- return stateBetween, nil
-}
-
-func (d *SyncServerDatasource) fetchMissingStateEvents(
- ctx context.Context, txn *sql.Tx, eventIDs []string,
-) ([]types.StreamEvent, error) {
- // Fetch from the events table first so we pick up the stream ID for the
- // event.
- events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs)
- if err != nil {
- return nil, err
- }
-
- have := map[string]bool{}
- for _, event := range events {
- have[event.EventID()] = true
- }
- var missing []string
- for _, eventID := range eventIDs {
- if !have[eventID] {
- missing = append(missing, eventID)
- }
- }
- if len(missing) == 0 {
- return events, nil
- }
-
- // If they are missing from the events table then they should be state
- // events that we received from outside the main event stream.
- // These should be in the room state table.
- stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing)
-
- if err != nil {
- return nil, err
- }
- if len(stateEvents) != len(missing) {
- return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
- }
- events = append(events, stateEvents...)
- return events, nil
-}
-
-// getStateDeltas returns the state deltas between fromPos and toPos,
-// exclusive of oldPos, inclusive of newPos, for the rooms in which
-// the user has new membership events.
-// A list of joined room IDs is also returned in case the caller needs it.
-func (d *SyncServerDatasource) getStateDeltas(
- ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
- stateFilterPart *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
- // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
- // - Get membership list changes for this user in this sync response
- // - For each room which has membership list changes:
- // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
- // If it is, then we need to send the full room state down (and 'limited' is always true).
- // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
- // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
- // - Get all CURRENTLY joined rooms, and add them to 'joined' block.
- var deltas []stateDelta
-
- // get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
- if err != nil {
- return nil, nil, err
- }
- state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
- if err != nil {
- return nil, nil, err
- }
-
- for roomID, stateStreamEvents := range state {
- for _, ev := range stateStreamEvents {
- // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
- // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
- // dupe join events will result in the entire room state coming down to the client again. This is added in
- // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
- // the timeline.
- if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
- if membership == gomatrixserverlib.Join {
- // send full room state down instead of a delta
- var s []types.StreamEvent
- s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart)
- if err != nil {
- return nil, nil, err
- }
- state[roomID] = s
- continue // we'll add this room in when we do joined rooms
- }
-
- deltas = append(deltas, stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
- })
- break
- }
- }
- }
-
- // Add in currently joined rooms
- joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return nil, nil, err
- }
- for _, joinedRoomID := range joinedRoomIDs {
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
- roomID: joinedRoomID,
- })
- }
-
- return deltas, joinedRoomIDs, nil
-}
-
-// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
-// requests with full_state=true.
-// Fetches full state for all joined rooms and uses selectStateInRange to get
-// updates for other rooms.
-func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
- ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
- stateFilterPart *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
- joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return nil, nil, err
- }
-
- // Use a reasonable initial capacity
- deltas := make([]stateDelta, 0, len(joinedRoomIDs))
-
- // Add full states for all joined rooms
- for _, joinedRoomID := range joinedRoomIDs {
- s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart)
- if stateErr != nil {
- return nil, nil, stateErr
- }
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, s),
- roomID: joinedRoomID,
- })
- }
-
- // Get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
- if err != nil {
- return nil, nil, err
- }
- state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
- if err != nil {
- return nil, nil, err
- }
-
- for roomID, stateStreamEvents := range state {
- for _, ev := range stateStreamEvents {
- if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
- if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
- deltas = append(deltas, stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
- })
- }
-
- break
- }
- }
- }
-
- return deltas, joinedRoomIDs, nil
-}
-
-func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
- ctx context.Context, txn *sql.Tx, roomID string,
- stateFilterPart *gomatrixserverlib.StateFilter,
-) ([]types.StreamEvent, error) {
- allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
- if err != nil {
- return nil, err
- }
- s := make([]types.StreamEvent, len(allState))
- for i := 0; i < len(s); i++ {
- s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
- }
- return s, nil
-}
-
-// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
-// matches the streamevent.transactionID device then the transaction ID gets
-// added to the unsigned section of the output event.
-func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
- out := make([]gomatrixserverlib.HeaderedEvent, len(in))
- for i := 0; i < len(in); i++ {
- out[i] = in[i].HeaderedEvent
- if device != nil && in[i].TransactionID != nil {
- if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
- err := out[i].SetUnsignedField(
- "transaction_id", in[i].TransactionID.TransactionID,
- )
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "event_id": out[i].EventID(),
- }).WithError(err).Warnf("Failed to add transaction ID to event")
- }
- }
- }
- }
- return out
-}
-
-// There may be some overlap where events in stateEvents are already in recentEvents, so filter
-// them out so we don't include them twice in the /sync response. They should be in recentEvents
-// only, so clients get to the correct state once they have rolled forward.
-func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
- for _, recentEv := range recentEvents {
- if recentEv.StateKey() == nil {
- continue // not a state event
- }
- // TODO: This is a linear scan over all the current state events in this room. This will
- // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
- // then do a binary search to find matching events, similar to what roomserver does.
- for j := 0; j < len(stateEvents); j++ {
- if stateEvents[j].EventID() == recentEv.EventID() {
- // overwrite the element to remove with the last element then pop the last element.
- // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
- // (we don't care about the order of stateEvents)
- stateEvents[j] = stateEvents[len(stateEvents)-1]
- stateEvents = stateEvents[:len(stateEvents)-1]
- break // there shouldn't be multiple events with the same event ID
- }
- }
- }
- return stateEvents
-}
-
-// getMembershipFromEvent returns the value of content.membership iff the event is a state event
-// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
-func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string {
- if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
- membership, err := ev.Membership()
- if err != nil {
- return ""
- }
- return membership
- }
- return ""
-}
diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go
index 9574f37b..76a4b7a4 100644
--- a/syncapi/storage/storage.go
+++ b/syncapi/storage/storage.go
@@ -28,14 +28,14 @@ import (
func NewSyncServerDatasource(dataSourceName string, dbProperties common.DbProperties) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
- return postgres.NewSyncServerDatasource(dataSourceName, dbProperties)
+ return postgres.NewDatabase(dataSourceName, dbProperties)
}
switch uri.Scheme {
case "postgres":
- return postgres.NewSyncServerDatasource(dataSourceName, dbProperties)
+ return postgres.NewDatabase(dataSourceName, dbProperties)
case "file":
- return sqlite3.NewSyncServerDatasource(dataSourceName)
+ return sqlite3.NewDatabase(dataSourceName)
default:
- return postgres.NewSyncServerDatasource(dataSourceName, dbProperties)
+ return postgres.NewDatabase(dataSourceName, dbProperties)
}
}
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index f7fa1a87..bffcfd05 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -51,7 +51,7 @@ func MustCreateEvent(t *testing.T, roomID string, prevs []gomatrixserverlib.Head
}
func MustCreateDatabase(t *testing.T) storage.Database {
- db, err := sqlite3.NewSyncServerDatasource("file::memory:")
+ db, err := sqlite3.NewDatabase("file::memory:")
if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err)
}
diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go
index 84bd9df9..666179af 100644
--- a/syncapi/storage/storage_wasm.go
+++ b/syncapi/storage/storage_wasm.go
@@ -35,7 +35,7 @@ func NewSyncServerDatasource(
case "postgres":
return nil, fmt.Errorf("Cannot use postgres implementation")
case "file":
- return sqlite3.NewSyncServerDatasource(dataSourceName)
+ return sqlite3.NewDatabase(dataSourceName)
default:
return nil, fmt.Errorf("Cannot use postgres implementation")
}
diff --git a/syncapi/storage/tables/backward_extremities.go b/syncapi/storage/tables/backward_extremities.go
deleted file mode 100644
index babd9aaa..00000000
--- a/syncapi/storage/tables/backward_extremities.go
+++ /dev/null
@@ -1,175 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tables
-
-import (
- "context"
- "database/sql"
-
- "github.com/matrix-org/dendrite/common"
-)
-
-// BackwardsExtremitiesStatements contains the SQL statements to implement.
-// See BackwardsExtremities to see the parameter and response types.
-type BackwardsExtremitiesStatements interface {
- Schema() string
- InsertBackwardExtremity() string
- SelectBackwardExtremitiesForRoom() string
- DeleteBackwardExtremity() string
-}
-
-type PostgresBackwardsExtremitiesStatements struct{}
-
-func (s *PostgresBackwardsExtremitiesStatements) Schema() string {
- return `-- Stores output room events received from the roomserver.
- CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
- -- The 'room_id' key for the event.
- room_id TEXT NOT NULL,
- -- The event ID for the last known event. This is the backwards extremity.
- event_id TEXT NOT NULL,
- -- The prev_events for the last known event. This is used to update extremities.
- prev_event_id TEXT NOT NULL,
-
- PRIMARY KEY(room_id, event_id, prev_event_id)
- );
- `
-}
-func (s *PostgresBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
- return "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
- " VALUES ($1, $2, $3)" +
- " ON CONFLICT DO NOTHING"
-}
-func (s *PostgresBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
- return "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-}
-func (s *PostgresBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
- return "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
-}
-
-type SqliteBackwardsExtremitiesStatements struct{}
-
-func (s *SqliteBackwardsExtremitiesStatements) Schema() string {
- return `-- Stores output room events received from the roomserver.
-CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
- -- The 'room_id' key for the event.
- room_id TEXT NOT NULL,
- -- The event ID for the last known event. This is the backwards extremity.
- event_id TEXT NOT NULL,
- -- The prev_events for the last known event. This is used to update extremities.
- prev_event_id TEXT NOT NULL,
-
- PRIMARY KEY(room_id, event_id, prev_event_id)
-);
-`
-}
-
-func (s *SqliteBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
- return "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
- " VALUES ($1, $2, $3)" +
- " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
-}
-
-func (s *SqliteBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
- return "" +
- "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-}
-
-func (s *SqliteBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
- return "" +
- "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
-}
-
-// BackwardsExtremities keeps track of backwards extremities for a room.
-// Backwards extremities are the earliest (DAG-wise) known events which we have
-// the entire event JSON. These event IDs are used in federation requests to fetch
-// even earlier events.
-//
-// We persist the previous event IDs as well, one per row, so when we do fetch even
-// earlier events we can simply delete rows which referenced it. Consider the graph:
-// A
-// | Event C has 1 prev_event ID: A.
-// B C
-// |___| Event D has 2 prev_event IDs: B and C.
-// |
-// D
-// The earliest known event we have is D, so this table has 2 rows.
-// A backfill request gives us C but not B. We delete rows where prev_event=C. This
-// still means that D is a backwards extremity as we do not have event B. However, event
-// C is *also* a backwards extremity at this point as we do not have event A. Later,
-// when we fetch event B, we delete rows where prev_event=B. This then removes D as
-// a backwards extremity because there are no more rows with event_id=B.
-type BackwardsExtremities struct {
- insertBackwardExtremityStmt *sql.Stmt
- selectBackwardExtremitiesForRoomStmt *sql.Stmt
- deleteBackwardExtremityStmt *sql.Stmt
-}
-
-// NewBackwardsExtremities prepares the table
-func NewBackwardsExtremities(db *sql.DB, stmts BackwardsExtremitiesStatements) (table BackwardsExtremities, err error) {
- _, err = db.Exec(stmts.Schema())
- if err != nil {
- return
- }
- if table.insertBackwardExtremityStmt, err = db.Prepare(stmts.InsertBackwardExtremity()); err != nil {
- return
- }
- if table.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(stmts.SelectBackwardExtremitiesForRoom()); err != nil {
- return
- }
- if table.deleteBackwardExtremityStmt, err = db.Prepare(stmts.DeleteBackwardExtremity()); err != nil {
- return
- }
- return
-}
-
-// InsertsBackwardExtremity inserts a new backwards extremity.
-func (s *BackwardsExtremities) InsertsBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
-) (err error) {
- _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
- return
-}
-
-// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
-func (s *BackwardsExtremities) SelectBackwardExtremitiesForRoom(
- ctx context.Context, roomID string,
-) (eventIDs []string, err error) {
- rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
- if err != nil {
- return
- }
- defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
-
- for rows.Next() {
- var eID string
- if err = rows.Scan(&eID); err != nil {
- return
- }
-
- eventIDs = append(eventIDs, eID)
- }
-
- return eventIDs, rows.Err()
-}
-
-// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
-func (s *BackwardsExtremities) DeleteBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
-) (err error) {
- _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
- return
-}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index ffd50216..8f0b8b89 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -30,3 +30,61 @@ type Events interface {
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
}
+
+type Topology interface {
+ // InsertEventInTopology inserts the given event in the room's topology, based
+ // on the event's depth.
+ InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
+ // SelectEventIDsInRange selects the IDs of events which positions are within a
+ // given range in a given room's topological order.
+ // Returns an empty slice if no events match the given range.
+ SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
+ // SelectPositionInTopology returns the position of a given event in the
+ // topology of the room it belongs to.
+ SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error)
+ SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
+ // SelectEventIDsFromPosition returns the IDs of all events that have a given
+ // position in the topology of a given room.
+ SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error)
+}
+
+type CurrentRoomState interface {
+ SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
+ UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
+ DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
+ // SelectCurrentState returns all the current state events for the given room.
+ SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error)
+ // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+ SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
+ // SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
+ SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
+}
+
+// BackwardsExtremities keeps track of backwards extremities for a room.
+// Backwards extremities are the earliest (DAG-wise) known events which we have
+// the entire event JSON. These event IDs are used in federation requests to fetch
+// even earlier events.
+//
+// We persist the previous event IDs as well, one per row, so when we do fetch even
+// earlier events we can simply delete rows which referenced it. Consider the graph:
+// A
+// | Event C has 1 prev_event ID: A.
+// B C
+// |___| Event D has 2 prev_event IDs: B and C.
+// |
+// D
+// The earliest known event we have is D, so this table has 2 rows.
+// A backfill request gives us C but not B. We delete rows where prev_event=C. This
+// still means that D is a backwards extremity as we do not have event B. However, event
+// C is *also* a backwards extremity at this point as we do not have event A. Later,
+// when we fetch event B, we delete rows where prev_event=B. This then removes D as
+// a backwards extremity because there are no more rows with event_id=B.
+type BackwardsExtremities interface {
+ // InsertsBackwardExtremity inserts a new backwards extremity.
+ InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error)
+ // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
+ SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error)
+ // DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
+ DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
+}