aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go224
1 files changed, 184 insertions, 40 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 6267dd3a..94580adb 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -30,7 +30,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/sirupsen/logrus"
+ log "github.com/sirupsen/logrus"
)
// Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite
@@ -39,6 +39,7 @@ type Database struct {
DB *sql.DB
Writer sqlutil.Writer
Invites tables.Invites
+ Peeks tables.Peeks
AccountData tables.AccountData
OutputEvents tables.Events
Topology tables.Topology
@@ -120,6 +121,10 @@ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]stri
return d.CurrentRoomState.SelectJoinedUsers(ctx)
}
+func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) {
+ return d.Peeks.SelectPeekingDevices(ctx)
+}
+
func (d *Database) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) {
@@ -141,7 +146,7 @@ func (d *Database) AddInviteEvent(
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent)
- return nil
+ return err
})
return
}
@@ -153,11 +158,41 @@ func (d *Database) RetireInviteEvent(
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.DeleteInviteEvent(ctx, txn, inviteEventID)
- return nil
+ return err
})
return
}
+// AddPeek tracks the fact that a user has started peeking.
+// If the peek was successfully stored this returns the stream ID it was stored at.
+// Returns an error if there was a problem communicating with the database.
+func (d *Database) AddPeek(
+ ctx context.Context, roomID, userID, deviceID string,
+) (sp types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ sp, err = d.Peeks.InsertPeek(ctx, txn, roomID, userID, deviceID)
+ return err
+ })
+ return
+}
+
+// DeletePeeks tracks the fact that a user has stopped peeking from all devices
+// If the peeks was successfully deleted this returns the stream ID it was stored at.
+// Returns an error if there was a problem communicating with the database.
+func (d *Database) DeletePeeks(
+ ctx context.Context, roomID, userID string,
+) (sp types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ sp, err = d.Peeks.DeletePeeks(ctx, txn, roomID, userID)
+ return err
+ })
+ if err == sql.ErrNoRows {
+ sp = 0
+ err = nil
+ }
+ return
+}
+
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
@@ -196,7 +231,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
"transaction_id", in[i].TransactionID.TransactionID,
)
if err != nil {
- logrus.WithFields(logrus.Fields{
+ log.WithFields(log.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
}
@@ -389,7 +424,6 @@ func (d *Database) EventPositionInTopology(
func (d *Database) syncPositionTx(
ctx context.Context, txn *sql.Tx,
) (sp types.StreamingToken, err error) {
-
maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn)
if err != nil {
return sp, err
@@ -408,6 +442,13 @@ func (d *Database) syncPositionTx(
if maxInviteID > maxEventID {
maxEventID = maxInviteID
}
+ maxPeekID, err := d.Peeks.SelectMaxPeekID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ if maxPeekID > maxEventID {
+ maxEventID = maxPeekID
+ }
sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil)
return
}
@@ -566,6 +607,8 @@ func (d *Database) IncrementalSync(
}
}
+ // TODO: handle EDUs in peeked rooms
+
err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
@@ -582,7 +625,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err
}
if len(redactedEvents) == 0 {
- logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
+ log.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
return nil
}
eventToRedact := redactedEvents[0].Unwrap()
@@ -604,7 +647,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
// nolint:nakedret
func (d *Database) getResponseWithPDUsForCompleteSync(
ctx context.Context, res *types.Response,
- userID string,
+ userID string, deviceID string,
numRecentEventsPerRoom int,
) (
toPos types.StreamingToken,
@@ -644,46 +687,32 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
- var stateEvents []gomatrixserverlib.HeaderedEvent
- stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilter)
- if err != nil {
- return
- }
- // TODO: When filters are added, we may need to call this multiple times to get enough events.
- // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- var recentStreamEvents []types.StreamEvent
- var limited bool
- recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
+ var jr *types.JoinResponse
+ jr, err = d.getJoinResponseForCompleteSync(
+ ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom,
)
if err != nil {
return
}
+ res.Rooms.Join[roomID] = *jr
+ }
- // Retrieve the backward topology position, i.e. the position of the
- // oldest event in the room's topology.
- var prevBatchStr string
- if len(recentStreamEvents) > 0 {
- var backwardTopologyPos, backwardStreamPos types.StreamPosition
- backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
+ // Add peeked rooms.
+ peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r)
+ if err != nil {
+ return
+ }
+ for _, peek := range peeks {
+ if !peek.Deleted {
+ var jr *types.JoinResponse
+ jr, err = d.getJoinResponseForCompleteSync(
+ ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom,
+ )
if err != nil {
return
}
- prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
- prevBatch.Decrement()
- prevBatchStr = prevBatch.String()
+ res.Rooms.Peek[peek.RoomID] = *jr
}
-
- // We don't include a device here as we don't need to send down
- // transaction IDs for complete syncs
- recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
- stateEvents = removeDuplicates(stateEvents, recentEvents)
- jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatchStr
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
- jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[roomID] = *jr
}
if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
@@ -694,17 +723,68 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
return //res, toPos, joinedRoomIDs, err
}
+func (d *Database) getJoinResponseForCompleteSync(
+ ctx context.Context, txn *sql.Tx,
+ roomID string,
+ r types.Range,
+ stateFilter *gomatrixserverlib.StateFilter,
+ numRecentEventsPerRoom int,
+) (jr *types.JoinResponse, err error) {
+ var stateEvents []gomatrixserverlib.HeaderedEvent
+ stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
+ if err != nil {
+ return
+ }
+ // TODO: When filters are added, we may need to call this multiple times to get enough events.
+ // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
+ var recentStreamEvents []types.StreamEvent
+ var limited bool
+ recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
+ ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
+ )
+ if err != nil {
+ return
+ }
+
+ // Retrieve the backward topology position, i.e. the position of the
+ // oldest event in the room's topology.
+ var prevBatchStr string
+ if len(recentStreamEvents) > 0 {
+ var backwardTopologyPos, backwardStreamPos types.StreamPosition
+ backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
+ if err != nil {
+ return
+ }
+ prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
+ prevBatch.Decrement()
+ prevBatchStr = prevBatch.String()
+ }
+
+ // We don't include a device here as we don't need to send down
+ // transaction IDs for complete syncs
+ recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
+ stateEvents = removeDuplicates(stateEvents, recentEvents)
+ jr = types.NewJoinResponse()
+ jr.Timeline.PrevBatch = prevBatchStr
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = limited
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ return jr, nil
+}
+
func (d *Database) CompleteSync(
ctx context.Context, res *types.Response,
device userapi.Device, numRecentEventsPerRoom int,
) (*types.Response, error) {
toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
- ctx, res, device.UserID, numRecentEventsPerRoom,
+ ctx, res, device.UserID, device.ID, numRecentEventsPerRoom,
)
if err != nil {
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
}
+ // TODO: handle EDUs in peeked rooms
+
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
@@ -803,6 +883,12 @@ func (d *Database) addRoomDeltaToResponse(
return err
}
+ // XXX: should we ever get this far if we have no recent events or state in this room?
+ // in practice we do for peeks, but possibly not joins?
+ if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
+ return nil
+ }
+
switch delta.membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
@@ -812,6 +898,14 @@ func (d *Database) addRoomDeltaToResponse(
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
+ case gomatrixserverlib.Peek:
+ jr := types.NewJoinResponse()
+
+ jr.Timeline.PrevBatch = prevBatch.String()
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = limited
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Peek[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
@@ -918,6 +1012,7 @@ func (d *Database) fetchMissingStateEvents(
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
+// nolint:gocyclo
func (d *Database) getStateDeltas(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
r types.Range, userID string,
@@ -933,7 +1028,7 @@ func (d *Database) getStateDeltas(
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
- // get all the state events ever between these two positions
+ // get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
@@ -943,6 +1038,34 @@ func (d *Database) getStateDeltas(
return nil, nil, err
}
+ // find out which rooms this user is peeking, if any.
+ // We do this before joins so any peeks get overwritten
+ peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // add peek blocks
+ for _, peek := range peeks {
+ if peek.New {
+ // send full room state down instead of a delta
+ var s []types.StreamEvent
+ s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
+ if err != nil {
+ return nil, nil, err
+ }
+ state[peek.RoomID] = s
+ }
+ if !peek.Deleted {
+ deltas = append(deltas, stateDelta{
+ membership: gomatrixserverlib.Peek,
+ stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
+ roomID: peek.RoomID,
+ })
+ }
+ }
+
+ // handle newly joined rooms and non-joined rooms
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
@@ -993,6 +1116,7 @@ func (d *Database) getStateDeltas(
// requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
+// nolint:gocyclo
func (d *Database) getStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
r types.Range, userID string,
@@ -1001,6 +1125,26 @@ func (d *Database) getStateDeltasForFullStateSync(
// Use a reasonable initial capacity
deltas := make(map[string]stateDelta)
+ peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Add full states for all peeking rooms
+ for _, peek := range peeks {
+ if !peek.Deleted {
+ s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
+ if stateErr != nil {
+ return nil, nil, stateErr
+ }
+ deltas[peek.RoomID] = stateDelta{
+ membership: gomatrixserverlib.Peek,
+ stateEvents: d.StreamEventsToEvents(device, s),
+ roomID: peek.RoomID,
+ }
+ }
+ }
+
// Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {