diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-09-30 12:48:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-30 12:48:10 +0100 |
commit | 6348486a1365c7469a498101f5035a9b6bd16d22 (patch) | |
tree | d8a5ba572c5fc4fdec383802de5fac3a5e13c24d /syncapi/streams | |
parent | 8a82f100460dc5ca7bd39ae2345c251d6622c494 (diff) |
Transactional isolation for `/sync` (#2745)
This should transactional snapshot isolation for `/sync` etc requests.
For now we don't use repeatable read due to some odd test failures with
invites.
Diffstat (limited to 'syncapi/streams')
-rw-r--r-- | syncapi/streams/stream_accountdata.go | 17 | ||||
-rw-r--r-- | syncapi/streams/stream_devicelist.go | 7 | ||||
-rw-r--r-- | syncapi/streams/stream_invite.go | 19 | ||||
-rw-r--r-- | syncapi/streams/stream_notificationdata.go | 20 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 141 | ||||
-rw-r--r-- | syncapi/streams/stream_presence.go | 22 | ||||
-rw-r--r-- | syncapi/streams/stream_receipt.go | 20 | ||||
-rw-r--r-- | syncapi/streams/stream_sendtodevice.go | 20 | ||||
-rw-r--r-- | syncapi/streams/stream_typing.go | 7 | ||||
-rw-r--r-- | syncapi/streams/streamprovider.go | 28 | ||||
-rw-r--r-- | syncapi/streams/streams.go | 77 | ||||
-rw-r--r-- | syncapi/streams/template_stream.go | 10 |
12 files changed, 221 insertions, 167 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 0297d5c2..3f2f7d13 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -5,22 +5,25 @@ import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" ) type AccountDataStreamProvider struct { - StreamProvider + DefaultStreamProvider userAPI userapi.SyncUserAPI } -func (p *AccountDataStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *AccountDataStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamPositionForAccountData(context.Background()) + id, err := snapshot.MaxStreamPositionForAccountData(ctx) if err != nil { panic(err) } @@ -29,13 +32,15 @@ func (p *AccountDataStreamProvider) Setup() { func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *AccountDataStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { @@ -44,7 +49,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( To: to, } - dataTypes, pos, err := p.DB.GetAccountDataInRange( + dataTypes, pos, err := snapshot.GetAccountDataInRange( ctx, req.Device.UserID, r, &req.Filter.AccountData, ) if err != nil { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 5448ee5b..7996c203 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -6,17 +6,19 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type DeviceListStreamProvider struct { - StreamProvider + DefaultStreamProvider rsAPI api.SyncRoomserverAPI keyAPI keyapi.SyncKeyAPI } func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.LatestPosition(ctx) @@ -24,11 +26,12 @@ func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { var err error - to, _, err = internal.DeviceListCatchup(context.Background(), p.DB, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) + to, _, err = internal.DeviceListCatchup(context.Background(), snapshot, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { req.Log.WithError(err).Error("internal.DeviceListCatchup failed") return from diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 925da32f..17b3b843 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -9,20 +9,23 @@ import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type InviteStreamProvider struct { - StreamProvider + DefaultStreamProvider } -func (p *InviteStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *InviteStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamPositionForInvites(context.Background()) + id, err := snapshot.MaxStreamPositionForInvites(ctx) if err != nil { panic(err) } @@ -31,13 +34,15 @@ func (p *InviteStreamProvider) Setup() { func (p *InviteStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *InviteStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { @@ -46,7 +51,7 @@ func (p *InviteStreamProvider) IncrementalSync( To: to, } - invites, retiredInvites, err := p.DB.InviteEventsInRange( + invites, retiredInvites, maxID, err := snapshot.InviteEventsInRange( ctx, req.Device.UserID, r, ) if err != nil { @@ -86,5 +91,5 @@ func (p *InviteStreamProvider) IncrementalSync( } } - return to + return maxID } diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go index 33872734..5a81fd09 100644 --- a/syncapi/streams/stream_notificationdata.go +++ b/syncapi/streams/stream_notificationdata.go @@ -3,17 +3,23 @@ package streams import ( "context" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type NotificationDataStreamProvider struct { - StreamProvider + DefaultStreamProvider } -func (p *NotificationDataStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *NotificationDataStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := p.DB.MaxStreamPositionForNotificationData(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForNotificationData(ctx) if err != nil { panic(err) } @@ -22,20 +28,22 @@ func (p *NotificationDataStreamProvider) Setup() { func (p *NotificationDataStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *NotificationDataStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, _ types.StreamPosition, ) types.StreamPosition { // Get the unread notifications for rooms in our join response. // This is to ensure clients always have an unread notification section // and can display the correct numbers. - countsByRoom, err := p.DB.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms) + countsByRoom, err := snapshot.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms) if err != nil { req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed") return from diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 0ab6de88..89c5ba35 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "sort" - "sync" "time" "github.com/matrix-org/dendrite/internal/caching" @@ -18,7 +17,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" "github.com/tidwall/gjson" - "go.uber.org/atomic" "github.com/matrix-org/dendrite/syncapi/notifier" ) @@ -33,44 +31,23 @@ const PDU_STREAM_WORKERS = 256 const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8 type PDUStreamProvider struct { - StreamProvider + DefaultStreamProvider - tasks chan func() - workers atomic.Int32 // userID+deviceID -> lazy loading cache lazyLoadCache caching.LazyLoadCache rsAPI roomserverAPI.SyncRoomserverAPI notifier *notifier.Notifier } -func (p *PDUStreamProvider) worker() { - defer p.workers.Dec() - for { - select { - case f := <-p.tasks: - f() - case <-time.After(time.Second * 10): - return - } - } -} - -func (p *PDUStreamProvider) queue(f func()) { - if p.workers.Load() < PDU_STREAM_WORKERS { - p.workers.Inc() - go p.worker() - } - p.tasks <- f -} - -func (p *PDUStreamProvider) Setup() { - p.StreamProvider.Setup() - p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) +func (p *PDUStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamPositionForPDUs(context.Background()) + id, err := snapshot.MaxStreamPositionForPDUs(ctx) if err != nil { panic(err) } @@ -79,6 +56,7 @@ func (p *PDUStreamProvider) Setup() { func (p *PDUStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { from := types.StreamPosition(0) @@ -94,7 +72,7 @@ func (p *PDUStreamProvider) CompleteSync( } // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) + joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) if err != nil { req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") return from @@ -103,7 +81,7 @@ func (p *PDUStreamProvider) CompleteSync( stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline - if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil { + if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil { req.Log.WithError(err).Error("unable to update event filter with ignored users") } @@ -117,33 +95,20 @@ func (p *PDUStreamProvider) CompleteSync( } // Build up a /sync response. Add joined rooms. - var reqMutex sync.Mutex - var reqWaitGroup sync.WaitGroup - reqWaitGroup.Add(len(joinedRoomIDs)) - for _, room := range joinedRoomIDs { - roomID := room - p.queue(func() { - defer reqWaitGroup.Done() - - jr, jerr := p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, - ) - if jerr != nil { - req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") - return - } - - reqMutex.Lock() - defer reqMutex.Unlock() - req.Response.Rooms.Join[roomID] = *jr - req.Rooms[roomID] = gomatrixserverlib.Join - }) + for _, roomID := range joinedRoomIDs { + jr, jerr := p.getJoinResponseForCompleteSync( + ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, + ) + if jerr != nil { + req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") + continue // return from + } + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join } - reqWaitGroup.Wait() - // Add peeked rooms. - peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) + peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil { req.Log.WithError(err).Error("p.DB.PeeksInRange failed") return from @@ -152,11 +117,11 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, + ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return from + continue // return from } req.Response.Rooms.Peek[peek.RoomID] = *jr } @@ -167,6 +132,7 @@ func (p *PDUStreamProvider) CompleteSync( func (p *PDUStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) (newPos types.StreamPosition) { @@ -184,12 +150,12 @@ func (p *PDUStreamProvider) IncrementalSync( eventFilter := req.Filter.Room.Timeline if req.WantFullState { - if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") return } } else { - if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") return } @@ -203,7 +169,7 @@ func (p *PDUStreamProvider) IncrementalSync( return to } - if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil { + if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil { req.Log.WithError(err).Error("unable to update event filter with ignored users") } @@ -222,7 +188,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } @@ -244,6 +210,7 @@ func (p *PDUStreamProvider) IncrementalSync( // nolint:gocyclo func (p *PDUStreamProvider) addRoomDeltaToResponse( ctx context.Context, + snapshot storage.DatabaseTransaction, device *userapi.Device, r types.Range, delta types.StateDelta, @@ -260,7 +227,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. r.To = delta.MembershipPos } - recentStreamEvents, limited, err := p.DB.RecentEvents( + recentStreamEvents, limited, err := snapshot.RecentEvents( ctx, delta.RoomID, r, eventFilter, true, true, ) @@ -270,9 +237,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err) } - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back - prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) + prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err) } @@ -291,7 +258,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( latestPosition := r.To updateLatestPosition := func(mostRecentEventID string) { var pos types.StreamPosition - if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + if _, pos, err = snapshot.PositionInTopology(ctx, mostRecentEventID); err == nil { switch { case r.Backwards && pos < latestPosition: fallthrough @@ -303,7 +270,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( if stateFilter.LazyLoadMembers { delta.StateEvents, err = p.lazyLoadMembers( - ctx, delta.RoomID, true, limited, stateFilter, + ctx, snapshot, delta.RoomID, true, limited, stateFilter, device, recentEvents, delta.StateEvents, ) if err != nil && err != sql.ErrNoRows { @@ -320,7 +287,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } // Applies the history visibility rules - events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents) + events, err := applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") } @@ -336,7 +303,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( case gomatrixserverlib.Join: jr := types.NewJoinResponse() if hasMembershipChange { - p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition) + p.addRoomSummary(ctx, snapshot, jr, delta.RoomID, device.UserID, latestPosition) } jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) @@ -376,7 +343,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // sure we always return the required events in the timeline. func applyHistoryVisibilityFilter( ctx context.Context, - db storage.Database, + snapshot storage.DatabaseTransaction, rsAPI roomserverAPI.SyncRoomserverAPI, roomID, userID string, limit int, @@ -384,7 +351,7 @@ func applyHistoryVisibilityFilter( ) ([]*gomatrixserverlib.HeaderedEvent, error) { // We need to make sure we always include the latest states events, if they are in the timeline. // We grep at least limit * 2 events, to ensure we really get the needed events. - stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil) + stateEvents, err := snapshot.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil) if err != nil { // Not a fatal error, we can continue without the stateEvents, // they are only needed if there are state events in the timeline. @@ -395,7 +362,7 @@ func applyHistoryVisibilityFilter( alwaysIncludeIDs[ev.EventID()] = struct{}{} } startTime := time.Now() - events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") + events, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") if err != nil { return nil, err } @@ -408,10 +375,10 @@ func applyHistoryVisibilityFilter( return events, nil } -func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { +func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseTransaction, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { // Work out how many members are in the room. - joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) - invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition) + joinedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) + invitedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition) jr.Summary.JoinedMemberCount = &joinedCount jr.Summary.InvitedMemberCount = &invitedCount @@ -439,7 +406,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe } } } - heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"}) + heroes, err := snapshot.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"}) if err != nil { return } @@ -449,6 +416,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, roomID string, r types.Range, stateFilter *gomatrixserverlib.StateFilter, @@ -460,7 +428,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentStreamEvents, limited, err := p.DB.RecentEvents( + recentStreamEvents, limited, err := snapshot.RecentEvents( ctx, roomID, r, eventFilter, true, true, ) if err != nil { @@ -484,7 +452,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } } - stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs) + stateEvents, err := snapshot.CurrentState(ctx, roomID, stateFilter, excludingEventIDs) if err != nil { return } @@ -494,7 +462,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( var prevBatch *types.TopologyToken if len(recentStreamEvents) > 0 { var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) + backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, recentStreamEvents[0].EventID()) if err != nil { return } @@ -505,18 +473,18 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( prevBatch.Decrement() } - p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From) + p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From) // We don't include a device here as we don't need to send down // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: // "Can sync a room with a message with a transaction id" - which does a complete sync to check. - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) events := recentEvents // Only apply history visibility checks if the response is for joined rooms if !isPeek { - events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents) + events, err = applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") } @@ -530,7 +498,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( if err != nil { return nil, err } - stateEvents, err = p.lazyLoadMembers(ctx, roomID, + stateEvents, err = p.lazyLoadMembers( + ctx, snapshot, roomID, false, limited, stateFilter, device, recentEvents, stateEvents, ) @@ -549,7 +518,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } func (p *PDUStreamProvider) lazyLoadMembers( - ctx context.Context, roomID string, + ctx context.Context, snapshot storage.DatabaseTransaction, roomID string, incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device, timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, @@ -598,7 +567,7 @@ func (p *PDUStreamProvider) lazyLoadMembers( filter.Limit = stateFilter.Limit filter.Senders = &wantUsers filter.Types = &[]string{gomatrixserverlib.MRoomMember} - memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter) + memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter) if err != nil { return stateEvents, err } @@ -612,8 +581,8 @@ func (p *PDUStreamProvider) lazyLoadMembers( // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. -func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { - ignores, err := p.DB.IgnoresForUser(ctx, req.Device.UserID) +func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { + ignores, err := snapshot.IgnoresForUser(ctx, req.Device.UserID) if err != nil { if err == sql.ErrNoRows { return nil diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 15db4d30..81cea7d5 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -23,20 +23,26 @@ import ( "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type PresenceStreamProvider struct { - StreamProvider + DefaultStreamProvider // cache contains previously sent presence updates to avoid unneeded updates cache sync.Map notifier *notifier.Notifier } -func (p *PresenceStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *PresenceStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := p.DB.MaxStreamPositionForPresence(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForPresence(ctx) if err != nil { panic(err) } @@ -45,18 +51,20 @@ func (p *PresenceStreamProvider) Setup() { func (p *PresenceStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *PresenceStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { // We pull out a larger number than the filter asks for, since we're filtering out events later - presences, err := p.DB.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000}) + presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000}) if err != nil { req.Log.WithError(err).Error("p.DB.PresenceAfter failed") return from @@ -84,7 +92,7 @@ func (p *PresenceStreamProvider) IncrementalSync( } // Bear in mind that this might return nil, but at least populating // a nil means that there's a map entry so we won't repeat this call. - presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) + presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i]) if err != nil { req.Log.WithError(err).Error("unable to query presence for user") return from diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index f4e84c7d..8818a553 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -4,18 +4,24 @@ import ( "context" "encoding/json" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type ReceiptStreamProvider struct { - StreamProvider + DefaultStreamProvider } -func (p *ReceiptStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *ReceiptStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := p.DB.MaxStreamPositionForReceipts(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForReceipts(ctx) if err != nil { panic(err) } @@ -24,13 +30,15 @@ func (p *ReceiptStreamProvider) Setup() { func (p *ReceiptStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *ReceiptStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { @@ -41,7 +49,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( } } - lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from) + lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed") return from diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 31c6187c..00b67cc4 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -3,17 +3,23 @@ package streams import ( "context" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type SendToDeviceStreamProvider struct { - StreamProvider + DefaultStreamProvider } -func (p *SendToDeviceStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *SendToDeviceStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := p.DB.MaxStreamPositionForSendToDeviceMessages(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(ctx) if err != nil { panic(err) } @@ -22,18 +28,20 @@ func (p *SendToDeviceStreamProvider) Setup() { func (p *SendToDeviceStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *SendToDeviceStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { // See if we have any new tasks to do for the send-to-device messaging. - lastPos, events, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to) + lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to) if err != nil { req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") return from diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index f781065b..a6f7c7a0 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -5,24 +5,27 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type TypingStreamProvider struct { - StreamProvider + DefaultStreamProvider EDUCache *caching.EDUCache } func (p *TypingStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *TypingStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/streamprovider.go b/syncapi/streams/streamprovider.go new file mode 100644 index 00000000..8b12e2eb --- /dev/null +++ b/syncapi/streams/streamprovider.go @@ -0,0 +1,28 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type StreamProvider interface { + Setup(ctx context.Context, snapshot storage.DatabaseTransaction) + + // Advance will update the latest position of the stream based on + // an update and will wake callers waiting on StreamNotifyAfter. + Advance(latest types.StreamPosition) + + // CompleteSync will update the response to include all updates as needed + // for a complete sync. It will always return immediately. + CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest) types.StreamPosition + + // IncrementalSync will update the response to include all updates between + // the from and to sync positions. It will always return immediately, + // making no changes if the range contains no updates. + IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition + + // LatestPosition returns the latest stream position for this stream. + LatestPosition(ctx context.Context) types.StreamPosition +} diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index dbc053bd..eccbb3a4 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -13,15 +13,15 @@ import ( ) type Streams struct { - PDUStreamProvider types.StreamProvider - TypingStreamProvider types.StreamProvider - ReceiptStreamProvider types.StreamProvider - InviteStreamProvider types.StreamProvider - SendToDeviceStreamProvider types.StreamProvider - AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.StreamProvider - NotificationDataStreamProvider types.StreamProvider - PresenceStreamProvider types.StreamProvider + PDUStreamProvider StreamProvider + TypingStreamProvider StreamProvider + ReceiptStreamProvider StreamProvider + InviteStreamProvider StreamProvider + SendToDeviceStreamProvider StreamProvider + AccountDataStreamProvider StreamProvider + DeviceListStreamProvider StreamProvider + NotificationDataStreamProvider StreamProvider + PresenceStreamProvider StreamProvider } func NewSyncStreamProviders( @@ -31,51 +31,58 @@ func NewSyncStreamProviders( ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - lazyLoadCache: lazyLoadCache, - rsAPI: rsAPI, - notifier: notifier, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + lazyLoadCache: lazyLoadCache, + rsAPI: rsAPI, + notifier: notifier, }, TypingStreamProvider: &TypingStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - EDUCache: eduCache, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + EDUCache: eduCache, }, ReceiptStreamProvider: &ReceiptStreamProvider{ - StreamProvider: StreamProvider{DB: d}, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, }, InviteStreamProvider: &InviteStreamProvider{ - StreamProvider: StreamProvider{DB: d}, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, }, SendToDeviceStreamProvider: &SendToDeviceStreamProvider{ - StreamProvider: StreamProvider{DB: d}, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, }, AccountDataStreamProvider: &AccountDataStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - userAPI: userAPI, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + userAPI: userAPI, }, NotificationDataStreamProvider: &NotificationDataStreamProvider{ - StreamProvider: StreamProvider{DB: d}, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, }, DeviceListStreamProvider: &DeviceListStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - rsAPI: rsAPI, - keyAPI: keyAPI, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + rsAPI: rsAPI, + keyAPI: keyAPI, }, PresenceStreamProvider: &PresenceStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - notifier: notifier, + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + notifier: notifier, }, } - streams.PDUStreamProvider.Setup() - streams.TypingStreamProvider.Setup() - streams.ReceiptStreamProvider.Setup() - streams.InviteStreamProvider.Setup() - streams.SendToDeviceStreamProvider.Setup() - streams.AccountDataStreamProvider.Setup() - streams.NotificationDataStreamProvider.Setup() - streams.DeviceListStreamProvider.Setup() - streams.PresenceStreamProvider.Setup() + ctx := context.TODO() + snapshot, err := d.NewDatabaseSnapshot(ctx) + if err != nil { + panic(err) + } + defer snapshot.Rollback() // nolint:errcheck + + streams.PDUStreamProvider.Setup(ctx, snapshot) + streams.TypingStreamProvider.Setup(ctx, snapshot) + streams.ReceiptStreamProvider.Setup(ctx, snapshot) + streams.InviteStreamProvider.Setup(ctx, snapshot) + streams.SendToDeviceStreamProvider.Setup(ctx, snapshot) + streams.AccountDataStreamProvider.Setup(ctx, snapshot) + streams.NotificationDataStreamProvider.Setup(ctx, snapshot) + streams.DeviceListStreamProvider.Setup(ctx, snapshot) + streams.PresenceStreamProvider.Setup(ctx, snapshot) return streams } diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go index 15074cc1..f208d84e 100644 --- a/syncapi/streams/template_stream.go +++ b/syncapi/streams/template_stream.go @@ -8,16 +8,18 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" ) -type StreamProvider struct { +type DefaultStreamProvider struct { DB storage.Database latest types.StreamPosition latestMutex sync.RWMutex } -func (p *StreamProvider) Setup() { +func (p *DefaultStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { } -func (p *StreamProvider) Advance( +func (p *DefaultStreamProvider) Advance( latest types.StreamPosition, ) { p.latestMutex.Lock() @@ -28,7 +30,7 @@ func (p *StreamProvider) Advance( } } -func (p *StreamProvider) LatestPosition( +func (p *DefaultStreamProvider) LatestPosition( ctx context.Context, ) types.StreamPosition { p.latestMutex.RLock() |