aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/storage/shared/syncserver.go
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go780
1 files changed, 134 insertions, 646 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index ba9403a5..ebb99673 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -19,12 +19,10 @@ import (
"database/sql"
"encoding/json"
"fmt"
- "time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -49,7 +47,78 @@ type Database struct {
SendToDevice tables.SendToDevice
Filter tables.Filter
Receipts tables.Receipts
- EDUCache *cache.EDUCache
+}
+
+func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
+ return d.DB.BeginTx(ctx, &sql.TxOptions{
+ // Set the isolation level so that we see a snapshot of the database.
+ // In PostgreSQL repeatable read transactions will see a snapshot taken
+ // at the first query, and since the transaction is read-only it can't
+ // run into any serialisation errors.
+ // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
+ Isolation: sql.LevelRepeatableRead,
+ ReadOnly: true,
+ })
+}
+
+func (d *Database) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) {
+ id, err := d.OutputEvents.SelectMaxEventID(ctx, nil)
+ if err != nil {
+ return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err)
+ }
+ return types.StreamPosition(id), nil
+}
+
+func (d *Database) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) {
+ id, err := d.Receipts.SelectMaxReceiptID(ctx, nil)
+ if err != nil {
+ return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err)
+ }
+ return types.StreamPosition(id), nil
+}
+
+func (d *Database) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) {
+ id, err := d.Invites.SelectMaxInviteID(ctx, nil)
+ if err != nil {
+ return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err)
+ }
+ return types.StreamPosition(id), nil
+}
+
+func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) {
+ id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil)
+ if err != nil {
+ return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err)
+ }
+ return types.StreamPosition(id), nil
+}
+
+func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart)
+}
+
+func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) {
+ return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
+}
+
+func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
+ return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents)
+}
+
+func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
+ return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
+}
+
+func (d *Database) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
+ return d.Invites.SelectInviteEventsInRange(ctx, nil, targetUserID, r)
+}
+
+func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) {
+ return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r)
+}
+
+func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
+ return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
}
// Events lookups a list of event by their event ID.
@@ -99,6 +168,7 @@ func (d *Database) GetEventsInStreamingRange(
return events, err
}
+/*
func (d *Database) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
@@ -111,13 +181,16 @@ func (d *Database) RemoveTypingUser(
return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID))
}
-func (d *Database) AddSendToDevice() types.StreamPosition {
- return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
-}
-
func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.EDUCache.SetTimeoutCallback(fn)
}
+*/
+
+/*
+func (d *Database) AddSendToDevice() types.StreamPosition {
+ return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
+}
+*/
func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.CurrentRoomState.SelectJoinedUsers(ctx)
@@ -416,18 +489,6 @@ func (d *Database) GetEventsInTopologicalRange(
return
}
-func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
- err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
- pos, err := d.syncPositionTx(ctx, txn)
- if err != nil {
- return err
- }
- tok = pos
- return nil
- })
- return
-}
-
func (d *Database) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities map[string][]string, err error) {
@@ -454,215 +515,6 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
-func (d *Database) syncPositionTx(
- ctx context.Context, txn *sql.Tx,
-) (sp types.StreamingToken, err error) {
- maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn)
- if err != nil {
- return sp, err
- }
- maxAccountDataID, err := d.AccountData.SelectMaxAccountDataID(ctx, txn)
- if err != nil {
- return sp, err
- }
- if maxAccountDataID > maxEventID {
- maxEventID = maxAccountDataID
- }
- maxInviteID, err := d.Invites.SelectMaxInviteID(ctx, txn)
- if err != nil {
- return sp, err
- }
- if maxInviteID > maxEventID {
- maxEventID = maxInviteID
- }
- maxPeekID, err := d.Peeks.SelectMaxPeekID(ctx, txn)
- if err != nil {
- return sp, err
- }
- if maxPeekID > maxEventID {
- maxEventID = maxPeekID
- }
- maxReceiptID, err := d.Receipts.SelectMaxReceiptID(ctx, txn)
- if err != nil {
- return sp, err
- }
- // TODO: complete these positions
- sp = types.StreamingToken{
- PDUPosition: types.StreamPosition(maxEventID),
- TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
- ReceiptPosition: types.StreamPosition(maxReceiptID),
- InvitePosition: types.StreamPosition(maxInviteID),
- }
- return
-}
-
-// addPDUDeltaToResponse adds all PDU deltas to a sync response.
-// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
-func (d *Database) addPDUDeltaToResponse(
- ctx context.Context,
- device userapi.Device,
- r types.Range,
- numRecentEventsPerRoom int,
- wantFullState bool,
- res *types.Response,
-) (joinedRoomIDs []string, err error) {
- txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return nil, err
- }
- succeeded := false
- defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
-
- stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Work out which rooms to return in the response. This is done by getting not only the currently
- // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
- // This works out what the 'state' key should be for each room as well as which membership block
- // to put the room into.
- var deltas []stateDelta
- if !wantFullState {
- deltas, joinedRoomIDs, err = d.getStateDeltas(
- ctx, &device, txn, r, device.UserID, &stateFilter,
- )
- if err != nil {
- return nil, fmt.Errorf("d.getStateDeltas: %w", err)
- }
- } else {
- deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
- ctx, &device, txn, r, device.UserID, &stateFilter,
- )
- if err != nil {
- return nil, fmt.Errorf("d.getStateDeltasForFullStateSync: %w", err)
- }
- }
-
- for _, delta := range deltas {
- err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res)
- if err != nil {
- return nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err)
- }
- }
-
- succeeded = true
- return joinedRoomIDs, nil
-}
-
-// addTypingDeltaToResponse adds all typing notifications to a sync response
-// since the specified position.
-func (d *Database) addTypingDeltaToResponse(
- since types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) error {
- var ok bool
- var err error
- for _, roomID := range joinedRoomIDs {
- var jr types.JoinResponse
- if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
- roomID, int64(since.TypingPosition),
- ); updated {
- ev := gomatrixserverlib.ClientEvent{
- Type: gomatrixserverlib.MTyping,
- }
- ev.Content, err = json.Marshal(map[string]interface{}{
- "user_ids": typingUsers,
- })
- if err != nil {
- return err
- }
-
- if jr, ok = res.Rooms.Join[roomID]; !ok {
- jr = *types.NewJoinResponse()
- }
- jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
- res.Rooms.Join[roomID] = jr
- }
- }
- res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition())
- return nil
-}
-
-// addReceiptDeltaToResponse adds all receipt information to a sync response
-// since the specified position
-func (d *Database) addReceiptDeltaToResponse(
- since types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) error {
- lastPos, receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
- if err != nil {
- return fmt.Errorf("unable to select receipts for rooms: %w", err)
- }
-
- // Group receipts by room, so we can create one ClientEvent for every room
- receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
- for _, receipt := range receipts {
- receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
- }
-
- for roomID, receipts := range receiptsByRoom {
- var jr types.JoinResponse
- var ok bool
-
- // Make sure we use an existing JoinResponse if there is one.
- // If not, we'll create a new one
- if jr, ok = res.Rooms.Join[roomID]; !ok {
- jr = types.JoinResponse{}
- }
-
- ev := gomatrixserverlib.ClientEvent{
- Type: gomatrixserverlib.MReceipt,
- RoomID: roomID,
- }
- content := make(map[string]eduAPI.ReceiptMRead)
- for _, receipt := range receipts {
- var read eduAPI.ReceiptMRead
- if read, ok = content[receipt.EventID]; !ok {
- read = eduAPI.ReceiptMRead{
- User: make(map[string]eduAPI.ReceiptTS),
- }
- }
- read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
- content[receipt.EventID] = read
- }
- ev.Content, err = json.Marshal(content)
- if err != nil {
- return err
- }
-
- jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
- res.Rooms.Join[roomID] = jr
- }
-
- res.NextBatch.ReceiptPosition = lastPos
- return nil
-}
-
-// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
-// the positions of that type are not equal in fromPos and toPos.
-func (d *Database) addEDUDeltaToResponse(
- fromPos, toPos types.StreamingToken,
- joinedRoomIDs []string,
- res *types.Response,
-) error {
- if fromPos.TypingPosition != toPos.TypingPosition {
- // add typing deltas
- if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
- return fmt.Errorf("unable to apply typing delta to response: %w", err)
- }
- }
-
- // Check on initial sync and if EDUPositions differ
- if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) ||
- fromPos.ReceiptPosition != toPos.ReceiptPosition {
- if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
- return fmt.Errorf("unable to apply receipts to response: %w", err)
- }
- }
-
- return nil
-}
-
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {
@@ -681,57 +533,6 @@ func (d *Database) PutFilter(
return filterID, err
}
-func (d *Database) IncrementalSync(
- ctx context.Context, res *types.Response,
- device userapi.Device,
- fromPos, toPos types.StreamingToken,
- numRecentEventsPerRoom int,
- wantFullState bool,
-) (*types.Response, error) {
- res.NextBatch = fromPos.WithUpdates(toPos)
-
- var joinedRoomIDs []string
- var err error
- if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
- r := types.Range{
- From: fromPos.PDUPosition,
- To: toPos.PDUPosition,
- }
- joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
- )
- if err != nil {
- return nil, fmt.Errorf("d.addPDUDeltaToResponse: %w", err)
- }
- } else {
- joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
- ctx, nil, device.UserID, gomatrixserverlib.Join,
- )
- if err != nil {
- return nil, fmt.Errorf("d.CurrentRoomState.SelectRoomIDsWithMembership: %w", err)
- }
- }
-
- // TODO: handle EDUs in peeked rooms
-
- err = d.addEDUDeltaToResponse(
- fromPos, toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
- }
-
- ir := types.Range{
- From: fromPos.InvitePosition,
- To: toPos.InvitePosition,
- }
- if err = d.addInvitesToResponse(ctx, nil, device.UserID, ir, res); err != nil {
- return nil, fmt.Errorf("d.addInvitesToResponse: %w", err)
- }
-
- return res, nil
-}
-
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
@@ -755,240 +556,17 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err
}
-// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
-// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
-// nolint:nakedret
-func (d *Database) getResponseWithPDUsForCompleteSync(
- ctx context.Context, res *types.Response,
- userID string, device userapi.Device,
- numRecentEventsPerRoom int,
-) (
- toPos types.StreamingToken,
- joinedRoomIDs []string,
- err error,
-) {
- // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
- // a consistent view of the database throughout. This includes extracting the sync position.
- // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
- // but it's better to not hide the fact that this is being done in a transaction.
- txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
- if err != nil {
- return
- }
- succeeded := false
- defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
-
- // Get the current sync position which we will base the sync response on.
- toPos, err = d.syncPositionTx(ctx, txn)
- if err != nil {
- return
- }
- r := types.Range{
- From: 0,
- To: toPos.PDUPosition,
- }
- ir := types.Range{
- From: 0,
- To: toPos.InvitePosition,
- }
-
- res.NextBatch.ApplyUpdates(toPos)
-
- // Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
- if err != nil {
- return
- }
-
- stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
-
- // Build up a /sync response. Add joined rooms.
- for _, roomID := range joinedRoomIDs {
- var jr *types.JoinResponse
- jr, err = d.getJoinResponseForCompleteSync(
- ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, device,
- )
- if err != nil {
- return
- }
- res.Rooms.Join[roomID] = *jr
- }
-
- // Add peeked rooms.
- peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
- if err != nil {
- return
- }
- for _, peek := range peeks {
- if !peek.Deleted {
- var jr *types.JoinResponse
- jr, err = d.getJoinResponseForCompleteSync(
- ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, device,
- )
- if err != nil {
- return
- }
- res.Rooms.Peek[peek.RoomID] = *jr
- }
- }
-
- if err = d.addInvitesToResponse(ctx, txn, userID, ir, res); err != nil {
- return
- }
-
- succeeded = true
- return //res, toPos, joinedRoomIDs, err
-}
-
-func (d *Database) getJoinResponseForCompleteSync(
- ctx context.Context, txn *sql.Tx,
- roomID string,
- r types.Range,
- stateFilter *gomatrixserverlib.StateFilter,
- numRecentEventsPerRoom int, device userapi.Device,
-) (jr *types.JoinResponse, err error) {
- var stateEvents []*gomatrixserverlib.HeaderedEvent
- stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
- if err != nil {
- return
- }
- // TODO: When filters are added, we may need to call this multiple times to get enough events.
- // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- var recentStreamEvents []types.StreamEvent
- var limited bool
- recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return
- }
-
- // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
- // user shouldn't see, we check the recent events and remove any prior to the join event of the user
- // which is equiv to history_visibility: joined
- joinEventIndex := -1
- for i := len(recentStreamEvents) - 1; i >= 0; i-- {
- ev := recentStreamEvents[i]
- if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
- membership, _ := ev.Membership()
- if membership == "join" {
- joinEventIndex = i
- if i > 0 {
- // the create event happens before the first join, so we should cut it at that point instead
- if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
- joinEventIndex = i - 1
- break
- }
- }
- break
- }
- }
- }
- if joinEventIndex != -1 {
- // cut all events earlier than the join (but not the join itself)
- recentStreamEvents = recentStreamEvents[joinEventIndex:]
- limited = false // so clients know not to try to backpaginate
- }
-
- // Retrieve the backward topology position, i.e. the position of the
- // oldest event in the room's topology.
- var prevBatch *types.TopologyToken
- if len(recentStreamEvents) > 0 {
- var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
- if err != nil {
- return
- }
- prevBatch = &types.TopologyToken{
- Depth: backwardTopologyPos,
- PDUPosition: backwardStreamPos,
- }
- prevBatch.Decrement()
- }
-
- // We don't include a device here as we don't need to send down
- // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
- // "Can sync a room with a message with a transaction id" - which does a complete sync to check.
- recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
- stateEvents = removeDuplicates(stateEvents, recentEvents)
- jr = types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
- return jr, nil
-}
-
-func (d *Database) CompleteSync(
- ctx context.Context, res *types.Response,
- device userapi.Device, numRecentEventsPerRoom int,
-) (*types.Response, error) {
- toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
- ctx, res, device.UserID, device, numRecentEventsPerRoom,
- )
- if err != nil {
- return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
- }
-
- // TODO: handle EDUs in peeked rooms
-
- // Use a zero value SyncPosition for fromPos so all EDU states are added.
- err = d.addEDUDeltaToResponse(
- types.StreamingToken{}, toPos, joinedRoomIDs, res,
- )
- if err != nil {
- return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
- }
-
- return res, nil
-}
-
-var txReadOnlySnapshot = sql.TxOptions{
- // Set the isolation level so that we see a snapshot of the database.
- // In PostgreSQL repeatable read transactions will see a snapshot taken
- // at the first query, and since the transaction is read-only it can't
- // run into any serialisation errors.
- // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
- Isolation: sql.LevelRepeatableRead,
- ReadOnly: true,
-}
-
-func (d *Database) addInvitesToResponse(
- ctx context.Context, txn *sql.Tx,
- userID string,
- r types.Range,
- res *types.Response,
-) error {
- invites, retiredInvites, err := d.Invites.SelectInviteEventsInRange(
- ctx, txn, userID, r,
- )
- if err != nil {
- return fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
- }
- for roomID, inviteEvent := range invites {
- ir := types.NewInviteResponse(inviteEvent)
- res.Rooms.Invite[roomID] = *ir
- }
- for roomID := range retiredInvites {
- if _, ok := res.Rooms.Join[roomID]; !ok {
- lr := types.NewLeaveResponse()
- res.Rooms.Leave[roomID] = *lr
- }
- }
- return nil
-}
-
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
-func (d *Database) getBackwardTopologyPos(
- ctx context.Context, txn *sql.Tx,
+func (d *Database) GetBackwardTopologyPos(
+ ctx context.Context,
events []types.StreamEvent,
) (types.TopologyToken, error) {
zeroToken := types.TopologyToken{}
if len(events) == 0 {
return zeroToken, nil
}
- pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
+ pos, spos, err := d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID())
if err != nil {
return zeroToken, err
}
@@ -997,78 +575,6 @@ func (d *Database) getBackwardTopologyPos(
return tok, nil
}
-// addRoomDeltaToResponse adds a room state delta to a sync response
-func (d *Database) addRoomDeltaToResponse(
- ctx context.Context,
- device *userapi.Device,
- txn *sql.Tx,
- r types.Range,
- delta stateDelta,
- numRecentEventsPerRoom int,
- res *types.Response,
-) error {
- if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
- // make sure we don't leak recent events after the leave event.
- // TODO: History visibility makes this somewhat complex to handle correctly. For example:
- // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
- // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
- // in a single /sync request
- // This is all "okay" assuming history_visibility == "shared" which it is by default.
- r.To = delta.membershipPos
- }
- recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
- ctx, txn, delta.roomID, r,
- numRecentEventsPerRoom, true, true,
- )
- if err != nil {
- return err
- }
- recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
- delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
- prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
- if err != nil {
- return err
- }
-
- // XXX: should we ever get this far if we have no recent events or state in this room?
- // in practice we do for peeks, but possibly not joins?
- if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
- return nil
- }
-
- switch delta.membership {
- case gomatrixserverlib.Join:
- jr := types.NewJoinResponse()
-
- jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[delta.roomID] = *jr
- case gomatrixserverlib.Peek:
- jr := types.NewJoinResponse()
-
- jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Peek[delta.roomID] = *jr
- case gomatrixserverlib.Leave:
- fallthrough // transitions to leave are the same as ban
- case gomatrixserverlib.Ban:
- // TODO: recentEvents may contain events that this user is not allowed to see because they are
- // no longer in the room.
- lr := types.NewLeaveResponse()
- lr.Timeline.PrevBatch = &prevBatch
- lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Leave[delta.roomID] = *lr
- }
-
- return nil
-}
-
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
// Returns a map of room ID to list of events.
func (d *Database) fetchStateEvents(
@@ -1166,11 +672,11 @@ func (d *Database) fetchMissingStateEvents(
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
// nolint:gocyclo
-func (d *Database) getStateDeltas(
- ctx context.Context, device *userapi.Device, txn *sql.Tx,
+func (d *Database) GetStateDeltas(
+ ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
+) ([]types.StateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@@ -1179,7 +685,14 @@ func (d *Database) getStateDeltas(
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
- var deltas []stateDelta
+ txn, err := d.readOnlySnapshot(ctx)
+ if err != nil {
+ return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
+ }
+ var succeeded bool
+ defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
+
+ var deltas []types.StateDelta
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
@@ -1210,10 +723,10 @@ func (d *Database) getStateDeltas(
state[peek.RoomID] = s
}
if !peek.Deleted {
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Peek,
- stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
- roomID: peek.RoomID,
+ deltas = append(deltas, types.StateDelta{
+ Membership: gomatrixserverlib.Peek,
+ StateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
+ RoomID: peek.RoomID,
})
}
}
@@ -1238,11 +751,11 @@ func (d *Database) getStateDeltas(
continue // we'll add this room in when we do joined rooms
}
- deltas = append(deltas, stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
+ deltas = append(deltas, types.StateDelta{
+ Membership: membership,
+ MembershipPos: ev.StreamPosition,
+ StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
+ RoomID: roomID,
})
break
}
@@ -1255,13 +768,14 @@ func (d *Database) getStateDeltas(
return nil, nil, err
}
for _, joinedRoomID := range joinedRoomIDs {
- deltas = append(deltas, stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
- roomID: joinedRoomID,
+ deltas = append(deltas, types.StateDelta{
+ Membership: gomatrixserverlib.Join,
+ StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
+ RoomID: joinedRoomID,
})
}
+ succeeded = true
return deltas, joinedRoomIDs, nil
}
@@ -1270,13 +784,20 @@ func (d *Database) getStateDeltas(
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
// nolint:gocyclo
-func (d *Database) getStateDeltasForFullStateSync(
- ctx context.Context, device *userapi.Device, txn *sql.Tx,
+func (d *Database) GetStateDeltasForFullStateSync(
+ ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
-) ([]stateDelta, []string, error) {
+) ([]types.StateDelta, []string, error) {
+ txn, err := d.readOnlySnapshot(ctx)
+ if err != nil {
+ return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
+ }
+ var succeeded bool
+ defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
+
// Use a reasonable initial capacity
- deltas := make(map[string]stateDelta)
+ deltas := make(map[string]types.StateDelta)
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
if err != nil {
@@ -1290,10 +811,10 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
- deltas[peek.RoomID] = stateDelta{
- membership: gomatrixserverlib.Peek,
- stateEvents: d.StreamEventsToEvents(device, s),
- roomID: peek.RoomID,
+ deltas[peek.RoomID] = types.StateDelta{
+ Membership: gomatrixserverlib.Peek,
+ StateEvents: d.StreamEventsToEvents(device, s),
+ RoomID: peek.RoomID,
}
}
}
@@ -1312,11 +833,11 @@ func (d *Database) getStateDeltasForFullStateSync(
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
- deltas[roomID] = stateDelta{
- membership: membership,
- membershipPos: ev.StreamPosition,
- stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
- roomID: roomID,
+ deltas[roomID] = types.StateDelta{
+ Membership: membership,
+ MembershipPos: ev.StreamPosition,
+ StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
+ RoomID: roomID,
}
}
@@ -1336,21 +857,22 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
- deltas[joinedRoomID] = stateDelta{
- membership: gomatrixserverlib.Join,
- stateEvents: d.StreamEventsToEvents(device, s),
- roomID: joinedRoomID,
+ deltas[joinedRoomID] = types.StateDelta{
+ Membership: gomatrixserverlib.Join,
+ StateEvents: d.StreamEventsToEvents(device, s),
+ RoomID: joinedRoomID,
}
}
// Create a response array.
- result := make([]stateDelta, len(deltas))
+ result := make([]types.StateDelta, len(deltas))
i := 0
for _, delta := range deltas {
result[i] = delta
i++
}
+ succeeded = true
return result, joinedRoomIDs, nil
}
@@ -1470,31 +992,6 @@ func (d *Database) CleanSendToDeviceUpdates(
return
}
-// There may be some overlap where events in stateEvents are already in recentEvents, so filter
-// them out so we don't include them twice in the /sync response. They should be in recentEvents
-// only, so clients get to the correct state once they have rolled forward.
-func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
- for _, recentEv := range recentEvents {
- if recentEv.StateKey() == nil {
- continue // not a state event
- }
- // TODO: This is a linear scan over all the current state events in this room. This will
- // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
- // then do a binary search to find matching events, similar to what roomserver does.
- for j := 0; j < len(stateEvents); j++ {
- if stateEvents[j].EventID() == recentEv.EventID() {
- // overwrite the element to remove with the last element then pop the last element.
- // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
- // (we don't care about the order of stateEvents)
- stateEvents[j] = stateEvents[len(stateEvents)-1]
- stateEvents = stateEvents[:len(stateEvents)-1]
- break // there shouldn't be multiple events with the same event ID
- }
- }
- }
- return stateEvents
-}
-
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
@@ -1508,15 +1005,6 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
return membership
}
-type stateDelta struct {
- roomID string
- stateEvents []*gomatrixserverlib.HeaderedEvent
- membership string
- // The PDU stream position of the latest membership event for this user, if applicable.
- // Can be 0 if there is no membership event in this delta.
- membershipPos types.StreamPosition
-}
-
// StoreReceipt stores user receipts
func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {