diff options
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 224 |
1 files changed, 184 insertions, 40 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 6267dd3a..94580adb 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -30,7 +30,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" ) // Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite @@ -39,6 +39,7 @@ type Database struct { DB *sql.DB Writer sqlutil.Writer Invites tables.Invites + Peeks tables.Peeks AccountData tables.AccountData OutputEvents tables.Events Topology tables.Topology @@ -120,6 +121,10 @@ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]stri return d.CurrentRoomState.SelectJoinedUsers(ctx) } +func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) { + return d.Peeks.SelectPeekingDevices(ctx) +} + func (d *Database) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { @@ -141,7 +146,7 @@ func (d *Database) AddInviteEvent( ) (sp types.StreamPosition, err error) { _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent) - return nil + return err }) return } @@ -153,11 +158,41 @@ func (d *Database) RetireInviteEvent( ) (sp types.StreamPosition, err error) { _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { sp, err = d.Invites.DeleteInviteEvent(ctx, txn, inviteEventID) - return nil + return err }) return } +// AddPeek tracks the fact that a user has started peeking. +// If the peek was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *Database) AddPeek( + ctx context.Context, roomID, userID, deviceID string, +) (sp types.StreamPosition, err error) { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + sp, err = d.Peeks.InsertPeek(ctx, txn, roomID, userID, deviceID) + return err + }) + return +} + +// DeletePeeks tracks the fact that a user has stopped peeking from all devices +// If the peeks was successfully deleted this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *Database) DeletePeeks( + ctx context.Context, roomID, userID string, +) (sp types.StreamPosition, err error) { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + sp, err = d.Peeks.DeletePeeks(ctx, txn, roomID, userID) + return err + }) + if err == sql.ErrNoRows { + sp = 0 + err = nil + } + return +} + // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions // Returns a map following the format data[roomID] = []dataTypes @@ -196,7 +231,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea "transaction_id", in[i].TransactionID.TransactionID, ) if err != nil { - logrus.WithFields(logrus.Fields{ + log.WithFields(log.Fields{ "event_id": out[i].EventID(), }).WithError(err).Warnf("Failed to add transaction ID to event") } @@ -389,7 +424,6 @@ func (d *Database) EventPositionInTopology( func (d *Database) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (sp types.StreamingToken, err error) { - maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return sp, err @@ -408,6 +442,13 @@ func (d *Database) syncPositionTx( if maxInviteID > maxEventID { maxEventID = maxInviteID } + maxPeekID, err := d.Peeks.SelectMaxPeekID(ctx, txn) + if err != nil { + return sp, err + } + if maxPeekID > maxEventID { + maxEventID = maxPeekID + } sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil) return } @@ -566,6 +607,8 @@ func (d *Database) IncrementalSync( } } + // TODO: handle EDUs in peeked rooms + err = d.addEDUDeltaToResponse( fromPos, toPos, joinedRoomIDs, res, ) @@ -582,7 +625,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda return err } if len(redactedEvents) == 0 { - logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction") + log.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction") return nil } eventToRedact := redactedEvents[0].Unwrap() @@ -604,7 +647,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda // nolint:nakedret func (d *Database) getResponseWithPDUsForCompleteSync( ctx context.Context, res *types.Response, - userID string, + userID string, deviceID string, numRecentEventsPerRoom int, ) ( toPos types.StreamingToken, @@ -644,46 +687,32 @@ func (d *Database) getResponseWithPDUsForCompleteSync( // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { - var stateEvents []gomatrixserverlib.HeaderedEvent - stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilter) - if err != nil { - return - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - var limited bool - recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, + var jr *types.JoinResponse + jr, err = d.getJoinResponseForCompleteSync( + ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, ) if err != nil { return } + res.Rooms.Join[roomID] = *jr + } - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatchStr string - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + // Add peeked rooms. + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r) + if err != nil { + return + } + for _, peek := range peeks { + if !peek.Deleted { + var jr *types.JoinResponse + jr, err = d.getJoinResponseForCompleteSync( + ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, + ) if err != nil { return } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) - prevBatch.Decrement() - prevBatchStr = prevBatch.String() + res.Rooms.Peek[peek.RoomID] = *jr } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatchStr - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr } if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil { @@ -694,17 +723,68 @@ func (d *Database) getResponseWithPDUsForCompleteSync( return //res, toPos, joinedRoomIDs, err } +func (d *Database) getJoinResponseForCompleteSync( + ctx context.Context, txn *sql.Tx, + roomID string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + numRecentEventsPerRoom int, +) (jr *types.JoinResponse, err error) { + var stateEvents []gomatrixserverlib.HeaderedEvent + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + if err != nil { + return + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []types.StreamEvent + var limited bool + recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( + ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, + ) + if err != nil { + return + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatchStr string + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch.Decrement() + prevBatchStr = prevBatch.String() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs + recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr = types.NewJoinResponse() + jr.Timeline.PrevBatch = prevBatchStr + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + return jr, nil +} + func (d *Database) CompleteSync( ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int, ) (*types.Response, error) { toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, res, device.UserID, numRecentEventsPerRoom, + ctx, res, device.UserID, device.ID, numRecentEventsPerRoom, ) if err != nil { return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err) } + // TODO: handle EDUs in peeked rooms + // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, @@ -803,6 +883,12 @@ func (d *Database) addRoomDeltaToResponse( return err } + // XXX: should we ever get this far if we have no recent events or state in this room? + // in practice we do for peeks, but possibly not joins? + if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { + return nil + } + switch delta.membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() @@ -812,6 +898,14 @@ func (d *Database) addRoomDeltaToResponse( jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr + case gomatrixserverlib.Peek: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = prevBatch.String() + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.roomID] = *jr case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: @@ -918,6 +1012,7 @@ func (d *Database) fetchMissingStateEvents( // exclusive of oldPos, inclusive of newPos, for the rooms in which // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. +// nolint:gocyclo func (d *Database) getStateDeltas( ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, @@ -933,7 +1028,7 @@ func (d *Database) getStateDeltas( // - Get all CURRENTLY joined rooms, and add them to 'joined' block. var deltas []stateDelta - // get all the state events ever between these two positions + // get all the state events ever (i.e. for all available rooms) between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err @@ -943,6 +1038,34 @@ func (d *Database) getStateDeltas( return nil, nil, err } + // find out which rooms this user is peeking, if any. + // We do this before joins so any peeks get overwritten + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) + if err != nil { + return nil, nil, err + } + + // add peek blocks + for _, peek := range peeks { + if peek.New { + // send full room state down instead of a delta + var s []types.StreamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + if err != nil { + return nil, nil, err + } + state[peek.RoomID] = s + } + if !peek.Deleted { + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), + roomID: peek.RoomID, + }) + } + } + + // handle newly joined rooms and non-joined rooms for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. @@ -993,6 +1116,7 @@ func (d *Database) getStateDeltas( // requests with full_state=true. // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. +// nolint:gocyclo func (d *Database) getStateDeltasForFullStateSync( ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, @@ -1001,6 +1125,26 @@ func (d *Database) getStateDeltasForFullStateSync( // Use a reasonable initial capacity deltas := make(map[string]stateDelta) + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) + if err != nil { + return nil, nil, err + } + + // Add full states for all peeking rooms + for _, peek := range peeks { + if !peek.Deleted { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + if stateErr != nil { + return nil, nil, stateErr + } + deltas[peek.RoomID] = stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, s), + roomID: peek.RoomID, + } + } + } + // Get all the state events ever between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { |