aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-09-30 12:48:10 +0100
committerGitHub <noreply@github.com>2022-09-30 12:48:10 +0100
commit6348486a1365c7469a498101f5035a9b6bd16d22 (patch)
treed8a5ba572c5fc4fdec383802de5fac3a5e13c24d /syncapi/streams
parent8a82f100460dc5ca7bd39ae2345c251d6622c494 (diff)
Transactional isolation for `/sync` (#2745)
This should transactional snapshot isolation for `/sync` etc requests. For now we don't use repeatable read due to some odd test failures with invites.
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_accountdata.go17
-rw-r--r--syncapi/streams/stream_devicelist.go7
-rw-r--r--syncapi/streams/stream_invite.go19
-rw-r--r--syncapi/streams/stream_notificationdata.go20
-rw-r--r--syncapi/streams/stream_pdu.go141
-rw-r--r--syncapi/streams/stream_presence.go22
-rw-r--r--syncapi/streams/stream_receipt.go20
-rw-r--r--syncapi/streams/stream_sendtodevice.go20
-rw-r--r--syncapi/streams/stream_typing.go7
-rw-r--r--syncapi/streams/streamprovider.go28
-rw-r--r--syncapi/streams/streams.go77
-rw-r--r--syncapi/streams/template_stream.go10
12 files changed, 221 insertions, 167 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go
index 0297d5c2..3f2f7d13 100644
--- a/syncapi/streams/stream_accountdata.go
+++ b/syncapi/streams/stream_accountdata.go
@@ -5,22 +5,25 @@ import (
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type AccountDataStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
userAPI userapi.SyncUserAPI
}
-func (p *AccountDataStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *AccountDataStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
- id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
+ id, err := snapshot.MaxStreamPositionForAccountData(ctx)
if err != nil {
panic(err)
}
@@ -29,13 +32,15 @@ func (p *AccountDataStreamProvider) Setup() {
func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *AccountDataStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
@@ -44,7 +49,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
To: to,
}
- dataTypes, pos, err := p.DB.GetAccountDataInRange(
+ dataTypes, pos, err := snapshot.GetAccountDataInRange(
ctx, req.Device.UserID, r, &req.Filter.AccountData,
)
if err != nil {
diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go
index 5448ee5b..7996c203 100644
--- a/syncapi/streams/stream_devicelist.go
+++ b/syncapi/streams/stream_devicelist.go
@@ -6,17 +6,19 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type DeviceListStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
rsAPI api.SyncRoomserverAPI
keyAPI keyapi.SyncKeyAPI
}
func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
return p.LatestPosition(ctx)
@@ -24,11 +26,12 @@ func (p *DeviceListStreamProvider) CompleteSync(
func (p *DeviceListStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var err error
- to, _, err = internal.DeviceListCatchup(context.Background(), p.DB, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
+ to, _, err = internal.DeviceListCatchup(context.Background(), snapshot, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go
index 925da32f..17b3b843 100644
--- a/syncapi/streams/stream_invite.go
+++ b/syncapi/streams/stream_invite.go
@@ -9,20 +9,23 @@ import (
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type InviteStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
}
-func (p *InviteStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *InviteStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
- id, err := p.DB.MaxStreamPositionForInvites(context.Background())
+ id, err := snapshot.MaxStreamPositionForInvites(ctx)
if err != nil {
panic(err)
}
@@ -31,13 +34,15 @@ func (p *InviteStreamProvider) Setup() {
func (p *InviteStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *InviteStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
@@ -46,7 +51,7 @@ func (p *InviteStreamProvider) IncrementalSync(
To: to,
}
- invites, retiredInvites, err := p.DB.InviteEventsInRange(
+ invites, retiredInvites, maxID, err := snapshot.InviteEventsInRange(
ctx, req.Device.UserID, r,
)
if err != nil {
@@ -86,5 +91,5 @@ func (p *InviteStreamProvider) IncrementalSync(
}
}
- return to
+ return maxID
}
diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go
index 33872734..5a81fd09 100644
--- a/syncapi/streams/stream_notificationdata.go
+++ b/syncapi/streams/stream_notificationdata.go
@@ -3,17 +3,23 @@ package streams
import (
"context"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type NotificationDataStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
}
-func (p *NotificationDataStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *NotificationDataStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
- id, err := p.DB.MaxStreamPositionForNotificationData(context.Background())
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := snapshot.MaxStreamPositionForNotificationData(ctx)
if err != nil {
panic(err)
}
@@ -22,20 +28,22 @@ func (p *NotificationDataStreamProvider) Setup() {
func (p *NotificationDataStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *NotificationDataStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, _ types.StreamPosition,
) types.StreamPosition {
// Get the unread notifications for rooms in our join response.
// This is to ensure clients always have an unread notification section
// and can display the correct numbers.
- countsByRoom, err := p.DB.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms)
+ countsByRoom, err := snapshot.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms)
if err != nil {
req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed")
return from
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 0ab6de88..89c5ba35 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -5,7 +5,6 @@ import (
"database/sql"
"fmt"
"sort"
- "sync"
"time"
"github.com/matrix-org/dendrite/internal/caching"
@@ -18,7 +17,6 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
- "go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier"
)
@@ -33,44 +31,23 @@ const PDU_STREAM_WORKERS = 256
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
type PDUStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
- tasks chan func()
- workers atomic.Int32
// userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier
}
-func (p *PDUStreamProvider) worker() {
- defer p.workers.Dec()
- for {
- select {
- case f := <-p.tasks:
- f()
- case <-time.After(time.Second * 10):
- return
- }
- }
-}
-
-func (p *PDUStreamProvider) queue(f func()) {
- if p.workers.Load() < PDU_STREAM_WORKERS {
- p.workers.Inc()
- go p.worker()
- }
- p.tasks <- f
-}
-
-func (p *PDUStreamProvider) Setup() {
- p.StreamProvider.Setup()
- p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
+func (p *PDUStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
- id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
+ id, err := snapshot.MaxStreamPositionForPDUs(ctx)
if err != nil {
panic(err)
}
@@ -79,6 +56,7 @@ func (p *PDUStreamProvider) Setup() {
func (p *PDUStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
from := types.StreamPosition(0)
@@ -94,7 +72,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
+ joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
@@ -103,7 +81,7 @@ func (p *PDUStreamProvider) CompleteSync(
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
- if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
+ if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
@@ -117,33 +95,20 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Build up a /sync response. Add joined rooms.
- var reqMutex sync.Mutex
- var reqWaitGroup sync.WaitGroup
- reqWaitGroup.Add(len(joinedRoomIDs))
- for _, room := range joinedRoomIDs {
- roomID := room
- p.queue(func() {
- defer reqWaitGroup.Done()
-
- jr, jerr := p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
- )
- if jerr != nil {
- req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
- return
- }
-
- reqMutex.Lock()
- defer reqMutex.Unlock()
- req.Response.Rooms.Join[roomID] = *jr
- req.Rooms[roomID] = gomatrixserverlib.Join
- })
+ for _, roomID := range joinedRoomIDs {
+ jr, jerr := p.getJoinResponseForCompleteSync(
+ ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
+ )
+ if jerr != nil {
+ req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
+ continue // return from
+ }
+ req.Response.Rooms.Join[roomID] = *jr
+ req.Rooms[roomID] = gomatrixserverlib.Join
}
- reqWaitGroup.Wait()
-
// Add peeked rooms.
- peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
+ peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return from
@@ -152,11 +117,11 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
+ ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
- return from
+ continue // return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
}
@@ -167,6 +132,7 @@ func (p *PDUStreamProvider) CompleteSync(
func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) (newPos types.StreamPosition) {
@@ -184,12 +150,12 @@ func (p *PDUStreamProvider) IncrementalSync(
eventFilter := req.Filter.Room.Timeline
if req.WantFullState {
- if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
return
}
} else {
- if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
return
}
@@ -203,7 +169,7 @@ func (p *PDUStreamProvider) IncrementalSync(
return to
}
- if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
+ if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
@@ -222,7 +188,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
@@ -244,6 +210,7 @@ func (p *PDUStreamProvider) IncrementalSync(
// nolint:gocyclo
func (p *PDUStreamProvider) addRoomDeltaToResponse(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
device *userapi.Device,
r types.Range,
delta types.StateDelta,
@@ -260,7 +227,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.MembershipPos
}
- recentStreamEvents, limited, err := p.DB.RecentEvents(
+ recentStreamEvents, limited, err := snapshot.RecentEvents(
ctx, delta.RoomID, r,
eventFilter, true, true,
)
@@ -270,9 +237,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
}
- recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
- prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
+ prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
}
@@ -291,7 +258,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
var pos types.StreamPosition
- if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
+ if _, pos, err = snapshot.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos < latestPosition:
fallthrough
@@ -303,7 +270,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
- ctx, delta.RoomID, true, limited, stateFilter,
+ ctx, snapshot, delta.RoomID, true, limited, stateFilter,
device, recentEvents, delta.StateEvents,
)
if err != nil && err != sql.ErrNoRows {
@@ -320,7 +287,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
// Applies the history visibility rules
- events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
+ events, err := applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
@@ -336,7 +303,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
- p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
+ p.addRoomSummary(ctx, snapshot, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
@@ -376,7 +343,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// sure we always return the required events in the timeline.
func applyHistoryVisibilityFilter(
ctx context.Context,
- db storage.Database,
+ snapshot storage.DatabaseTransaction,
rsAPI roomserverAPI.SyncRoomserverAPI,
roomID, userID string,
limit int,
@@ -384,7 +351,7 @@ func applyHistoryVisibilityFilter(
) ([]*gomatrixserverlib.HeaderedEvent, error) {
// We need to make sure we always include the latest states events, if they are in the timeline.
// We grep at least limit * 2 events, to ensure we really get the needed events.
- stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
+ stateEvents, err := snapshot.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
if err != nil {
// Not a fatal error, we can continue without the stateEvents,
// they are only needed if there are state events in the timeline.
@@ -395,7 +362,7 @@ func applyHistoryVisibilityFilter(
alwaysIncludeIDs[ev.EventID()] = struct{}{}
}
startTime := time.Now()
- events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
+ events, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
if err != nil {
return nil, err
}
@@ -408,10 +375,10 @@ func applyHistoryVisibilityFilter(
return events, nil
}
-func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
+func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseTransaction, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
- joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
- invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
+ joinedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
+ invitedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
@@ -439,7 +406,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe
}
}
}
- heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
+ heroes, err := snapshot.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
if err != nil {
return
}
@@ -449,6 +416,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
@@ -460,7 +428,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- recentStreamEvents, limited, err := p.DB.RecentEvents(
+ recentStreamEvents, limited, err := snapshot.RecentEvents(
ctx, roomID, r, eventFilter, true, true,
)
if err != nil {
@@ -484,7 +452,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
}
- stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
+ stateEvents, err := snapshot.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
if err != nil {
return
}
@@ -494,7 +462,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
+ backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, recentStreamEvents[0].EventID())
if err != nil {
return
}
@@ -505,18 +473,18 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}
- p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From)
+ p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From)
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
- recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
events := recentEvents
// Only apply history visibility checks if the response is for joined rooms
if !isPeek {
- events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
+ events, err = applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
@@ -530,7 +498,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if err != nil {
return nil, err
}
- stateEvents, err = p.lazyLoadMembers(ctx, roomID,
+ stateEvents, err = p.lazyLoadMembers(
+ ctx, snapshot, roomID,
false, limited, stateFilter,
device, recentEvents, stateEvents,
)
@@ -549,7 +518,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
func (p *PDUStreamProvider) lazyLoadMembers(
- ctx context.Context, roomID string,
+ ctx context.Context, snapshot storage.DatabaseTransaction, roomID string,
incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter,
device *userapi.Device,
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
@@ -598,7 +567,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
filter.Limit = stateFilter.Limit
filter.Senders = &wantUsers
filter.Types = &[]string{gomatrixserverlib.MRoomMember}
- memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter)
+ memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter)
if err != nil {
return stateEvents, err
}
@@ -612,8 +581,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
-func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
- ignores, err := p.DB.IgnoresForUser(ctx, req.Device.UserID)
+func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
+ ignores, err := snapshot.IgnoresForUser(ctx, req.Device.UserID)
if err != nil {
if err == sql.ErrNoRows {
return nil
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 15db4d30..81cea7d5 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -23,20 +23,26 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type PresenceStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
// cache contains previously sent presence updates to avoid unneeded updates
cache sync.Map
notifier *notifier.Notifier
}
-func (p *PresenceStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *PresenceStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
- id, err := p.DB.MaxStreamPositionForPresence(context.Background())
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := snapshot.MaxStreamPositionForPresence(ctx)
if err != nil {
panic(err)
}
@@ -45,18 +51,20 @@ func (p *PresenceStreamProvider) Setup() {
func (p *PresenceStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *PresenceStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// We pull out a larger number than the filter asks for, since we're filtering out events later
- presences, err := p.DB.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
+ presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
if err != nil {
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
return from
@@ -84,7 +92,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
}
// Bear in mind that this might return nil, but at least populating
// a nil means that there's a map entry so we won't repeat this call.
- presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
+ presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
if err != nil {
req.Log.WithError(err).Error("unable to query presence for user")
return from
diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go
index f4e84c7d..8818a553 100644
--- a/syncapi/streams/stream_receipt.go
+++ b/syncapi/streams/stream_receipt.go
@@ -4,18 +4,24 @@ import (
"context"
"encoding/json"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type ReceiptStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
}
-func (p *ReceiptStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *ReceiptStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
- id, err := p.DB.MaxStreamPositionForReceipts(context.Background())
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := snapshot.MaxStreamPositionForReceipts(ctx)
if err != nil {
panic(err)
}
@@ -24,13 +30,15 @@ func (p *ReceiptStreamProvider) Setup() {
func (p *ReceiptStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *ReceiptStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
@@ -41,7 +49,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
}
}
- lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
+ lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
return from
diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go
index 31c6187c..00b67cc4 100644
--- a/syncapi/streams/stream_sendtodevice.go
+++ b/syncapi/streams/stream_sendtodevice.go
@@ -3,17 +3,23 @@ package streams
import (
"context"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type SendToDeviceStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
}
-func (p *SendToDeviceStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *SendToDeviceStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
- id, err := p.DB.MaxStreamPositionForSendToDeviceMessages(context.Background())
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(ctx)
if err != nil {
panic(err)
}
@@ -22,18 +28,20 @@ func (p *SendToDeviceStreamProvider) Setup() {
func (p *SendToDeviceStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *SendToDeviceStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// See if we have any new tasks to do for the send-to-device messaging.
- lastPos, events, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to)
+ lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
return from
diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go
index f781065b..a6f7c7a0 100644
--- a/syncapi/streams/stream_typing.go
+++ b/syncapi/streams/stream_typing.go
@@ -5,24 +5,27 @@ import (
"encoding/json"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
EDUCache *caching.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *TypingStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
diff --git a/syncapi/streams/streamprovider.go b/syncapi/streams/streamprovider.go
new file mode 100644
index 00000000..8b12e2eb
--- /dev/null
+++ b/syncapi/streams/streamprovider.go
@@ -0,0 +1,28 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type StreamProvider interface {
+ Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
+
+ // Advance will update the latest position of the stream based on
+ // an update and will wake callers waiting on StreamNotifyAfter.
+ Advance(latest types.StreamPosition)
+
+ // CompleteSync will update the response to include all updates as needed
+ // for a complete sync. It will always return immediately.
+ CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest) types.StreamPosition
+
+ // IncrementalSync will update the response to include all updates between
+ // the from and to sync positions. It will always return immediately,
+ // making no changes if the range contains no updates.
+ IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition
+
+ // LatestPosition returns the latest stream position for this stream.
+ LatestPosition(ctx context.Context) types.StreamPosition
+}
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index dbc053bd..eccbb3a4 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -13,15 +13,15 @@ import (
)
type Streams struct {
- PDUStreamProvider types.StreamProvider
- TypingStreamProvider types.StreamProvider
- ReceiptStreamProvider types.StreamProvider
- InviteStreamProvider types.StreamProvider
- SendToDeviceStreamProvider types.StreamProvider
- AccountDataStreamProvider types.StreamProvider
- DeviceListStreamProvider types.StreamProvider
- NotificationDataStreamProvider types.StreamProvider
- PresenceStreamProvider types.StreamProvider
+ PDUStreamProvider StreamProvider
+ TypingStreamProvider StreamProvider
+ ReceiptStreamProvider StreamProvider
+ InviteStreamProvider StreamProvider
+ SendToDeviceStreamProvider StreamProvider
+ AccountDataStreamProvider StreamProvider
+ DeviceListStreamProvider StreamProvider
+ NotificationDataStreamProvider StreamProvider
+ PresenceStreamProvider StreamProvider
}
func NewSyncStreamProviders(
@@ -31,51 +31,58 @@ func NewSyncStreamProviders(
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
- StreamProvider: StreamProvider{DB: d},
- lazyLoadCache: lazyLoadCache,
- rsAPI: rsAPI,
- notifier: notifier,
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ lazyLoadCache: lazyLoadCache,
+ rsAPI: rsAPI,
+ notifier: notifier,
},
TypingStreamProvider: &TypingStreamProvider{
- StreamProvider: StreamProvider{DB: d},
- EDUCache: eduCache,
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ EDUCache: eduCache,
},
ReceiptStreamProvider: &ReceiptStreamProvider{
- StreamProvider: StreamProvider{DB: d},
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
},
InviteStreamProvider: &InviteStreamProvider{
- StreamProvider: StreamProvider{DB: d},
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
- StreamProvider: StreamProvider{DB: d},
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
},
AccountDataStreamProvider: &AccountDataStreamProvider{
- StreamProvider: StreamProvider{DB: d},
- userAPI: userAPI,
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ userAPI: userAPI,
},
NotificationDataStreamProvider: &NotificationDataStreamProvider{
- StreamProvider: StreamProvider{DB: d},
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
},
DeviceListStreamProvider: &DeviceListStreamProvider{
- StreamProvider: StreamProvider{DB: d},
- rsAPI: rsAPI,
- keyAPI: keyAPI,
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ rsAPI: rsAPI,
+ keyAPI: keyAPI,
},
PresenceStreamProvider: &PresenceStreamProvider{
- StreamProvider: StreamProvider{DB: d},
- notifier: notifier,
+ DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ notifier: notifier,
},
}
- streams.PDUStreamProvider.Setup()
- streams.TypingStreamProvider.Setup()
- streams.ReceiptStreamProvider.Setup()
- streams.InviteStreamProvider.Setup()
- streams.SendToDeviceStreamProvider.Setup()
- streams.AccountDataStreamProvider.Setup()
- streams.NotificationDataStreamProvider.Setup()
- streams.DeviceListStreamProvider.Setup()
- streams.PresenceStreamProvider.Setup()
+ ctx := context.TODO()
+ snapshot, err := d.NewDatabaseSnapshot(ctx)
+ if err != nil {
+ panic(err)
+ }
+ defer snapshot.Rollback() // nolint:errcheck
+
+ streams.PDUStreamProvider.Setup(ctx, snapshot)
+ streams.TypingStreamProvider.Setup(ctx, snapshot)
+ streams.ReceiptStreamProvider.Setup(ctx, snapshot)
+ streams.InviteStreamProvider.Setup(ctx, snapshot)
+ streams.SendToDeviceStreamProvider.Setup(ctx, snapshot)
+ streams.AccountDataStreamProvider.Setup(ctx, snapshot)
+ streams.NotificationDataStreamProvider.Setup(ctx, snapshot)
+ streams.DeviceListStreamProvider.Setup(ctx, snapshot)
+ streams.PresenceStreamProvider.Setup(ctx, snapshot)
return streams
}
diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go
index 15074cc1..f208d84e 100644
--- a/syncapi/streams/template_stream.go
+++ b/syncapi/streams/template_stream.go
@@ -8,16 +8,18 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
)
-type StreamProvider struct {
+type DefaultStreamProvider struct {
DB storage.Database
latest types.StreamPosition
latestMutex sync.RWMutex
}
-func (p *StreamProvider) Setup() {
+func (p *DefaultStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
}
-func (p *StreamProvider) Advance(
+func (p *DefaultStreamProvider) Advance(
latest types.StreamPosition,
) {
p.latestMutex.Lock()
@@ -28,7 +30,7 @@ func (p *StreamProvider) Advance(
}
}
-func (p *StreamProvider) LatestPosition(
+func (p *DefaultStreamProvider) LatestPosition(
ctx context.Context,
) types.StreamPosition {
p.latestMutex.RLock()