aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_pdu.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_pdu.go')
-rw-r--r--syncapi/streams/stream_pdu.go141
1 files changed, 55 insertions, 86 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 0ab6de88..89c5ba35 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -5,7 +5,6 @@ import (
"database/sql"
"fmt"
"sort"
- "sync"
"time"
"github.com/matrix-org/dendrite/internal/caching"
@@ -18,7 +17,6 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
- "go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier"
)
@@ -33,44 +31,23 @@ const PDU_STREAM_WORKERS = 256
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
type PDUStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
- tasks chan func()
- workers atomic.Int32
// userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier
}
-func (p *PDUStreamProvider) worker() {
- defer p.workers.Dec()
- for {
- select {
- case f := <-p.tasks:
- f()
- case <-time.After(time.Second * 10):
- return
- }
- }
-}
-
-func (p *PDUStreamProvider) queue(f func()) {
- if p.workers.Load() < PDU_STREAM_WORKERS {
- p.workers.Inc()
- go p.worker()
- }
- p.tasks <- f
-}
-
-func (p *PDUStreamProvider) Setup() {
- p.StreamProvider.Setup()
- p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
+func (p *PDUStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
- id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
+ id, err := snapshot.MaxStreamPositionForPDUs(ctx)
if err != nil {
panic(err)
}
@@ -79,6 +56,7 @@ func (p *PDUStreamProvider) Setup() {
func (p *PDUStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
from := types.StreamPosition(0)
@@ -94,7 +72,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
+ joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
@@ -103,7 +81,7 @@ func (p *PDUStreamProvider) CompleteSync(
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
- if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
+ if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
@@ -117,33 +95,20 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Build up a /sync response. Add joined rooms.
- var reqMutex sync.Mutex
- var reqWaitGroup sync.WaitGroup
- reqWaitGroup.Add(len(joinedRoomIDs))
- for _, room := range joinedRoomIDs {
- roomID := room
- p.queue(func() {
- defer reqWaitGroup.Done()
-
- jr, jerr := p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
- )
- if jerr != nil {
- req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
- return
- }
-
- reqMutex.Lock()
- defer reqMutex.Unlock()
- req.Response.Rooms.Join[roomID] = *jr
- req.Rooms[roomID] = gomatrixserverlib.Join
- })
+ for _, roomID := range joinedRoomIDs {
+ jr, jerr := p.getJoinResponseForCompleteSync(
+ ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
+ )
+ if jerr != nil {
+ req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
+ continue // return from
+ }
+ req.Response.Rooms.Join[roomID] = *jr
+ req.Rooms[roomID] = gomatrixserverlib.Join
}
- reqWaitGroup.Wait()
-
// Add peeked rooms.
- peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
+ peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return from
@@ -152,11 +117,11 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
+ ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
- return from
+ continue // return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
}
@@ -167,6 +132,7 @@ func (p *PDUStreamProvider) CompleteSync(
func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) (newPos types.StreamPosition) {
@@ -184,12 +150,12 @@ func (p *PDUStreamProvider) IncrementalSync(
eventFilter := req.Filter.Room.Timeline
if req.WantFullState {
- if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
return
}
} else {
- if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
return
}
@@ -203,7 +169,7 @@ func (p *PDUStreamProvider) IncrementalSync(
return to
}
- if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
+ if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
@@ -222,7 +188,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
@@ -244,6 +210,7 @@ func (p *PDUStreamProvider) IncrementalSync(
// nolint:gocyclo
func (p *PDUStreamProvider) addRoomDeltaToResponse(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
device *userapi.Device,
r types.Range,
delta types.StateDelta,
@@ -260,7 +227,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.MembershipPos
}
- recentStreamEvents, limited, err := p.DB.RecentEvents(
+ recentStreamEvents, limited, err := snapshot.RecentEvents(
ctx, delta.RoomID, r,
eventFilter, true, true,
)
@@ -270,9 +237,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
}
- recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
- prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
+ prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
}
@@ -291,7 +258,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
var pos types.StreamPosition
- if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
+ if _, pos, err = snapshot.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos < latestPosition:
fallthrough
@@ -303,7 +270,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
- ctx, delta.RoomID, true, limited, stateFilter,
+ ctx, snapshot, delta.RoomID, true, limited, stateFilter,
device, recentEvents, delta.StateEvents,
)
if err != nil && err != sql.ErrNoRows {
@@ -320,7 +287,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
// Applies the history visibility rules
- events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
+ events, err := applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
@@ -336,7 +303,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
- p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
+ p.addRoomSummary(ctx, snapshot, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
@@ -376,7 +343,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// sure we always return the required events in the timeline.
func applyHistoryVisibilityFilter(
ctx context.Context,
- db storage.Database,
+ snapshot storage.DatabaseTransaction,
rsAPI roomserverAPI.SyncRoomserverAPI,
roomID, userID string,
limit int,
@@ -384,7 +351,7 @@ func applyHistoryVisibilityFilter(
) ([]*gomatrixserverlib.HeaderedEvent, error) {
// We need to make sure we always include the latest states events, if they are in the timeline.
// We grep at least limit * 2 events, to ensure we really get the needed events.
- stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
+ stateEvents, err := snapshot.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
if err != nil {
// Not a fatal error, we can continue without the stateEvents,
// they are only needed if there are state events in the timeline.
@@ -395,7 +362,7 @@ func applyHistoryVisibilityFilter(
alwaysIncludeIDs[ev.EventID()] = struct{}{}
}
startTime := time.Now()
- events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
+ events, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
if err != nil {
return nil, err
}
@@ -408,10 +375,10 @@ func applyHistoryVisibilityFilter(
return events, nil
}
-func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
+func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseTransaction, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
- joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
- invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
+ joinedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
+ invitedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
@@ -439,7 +406,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe
}
}
}
- heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
+ heroes, err := snapshot.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
if err != nil {
return
}
@@ -449,6 +416,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinRe
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
@@ -460,7 +428,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- recentStreamEvents, limited, err := p.DB.RecentEvents(
+ recentStreamEvents, limited, err := snapshot.RecentEvents(
ctx, roomID, r, eventFilter, true, true,
)
if err != nil {
@@ -484,7 +452,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
}
- stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
+ stateEvents, err := snapshot.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
if err != nil {
return
}
@@ -494,7 +462,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
+ backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, recentStreamEvents[0].EventID())
if err != nil {
return
}
@@ -505,18 +473,18 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}
- p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From)
+ p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From)
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
- recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ recentEvents := snapshot.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
events := recentEvents
// Only apply history visibility checks if the response is for joined rooms
if !isPeek {
- events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
+ events, err = applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
@@ -530,7 +498,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if err != nil {
return nil, err
}
- stateEvents, err = p.lazyLoadMembers(ctx, roomID,
+ stateEvents, err = p.lazyLoadMembers(
+ ctx, snapshot, roomID,
false, limited, stateFilter,
device, recentEvents, stateEvents,
)
@@ -549,7 +518,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
func (p *PDUStreamProvider) lazyLoadMembers(
- ctx context.Context, roomID string,
+ ctx context.Context, snapshot storage.DatabaseTransaction, roomID string,
incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter,
device *userapi.Device,
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
@@ -598,7 +567,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
filter.Limit = stateFilter.Limit
filter.Senders = &wantUsers
filter.Types = &[]string{gomatrixserverlib.MRoomMember}
- memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter)
+ memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter)
if err != nil {
return stateEvents, err
}
@@ -612,8 +581,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
-func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
- ignores, err := p.DB.IgnoresForUser(ctx, req.Device.UserID)
+func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
+ ignores, err := snapshot.IgnoresForUser(ctx, req.Device.UserID)
if err != nil {
if err == sql.ErrNoRows {
return nil