diff options
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 92 |
1 files changed, 85 insertions, 7 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index a7c07f94..2b82ee33 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + eduAPI "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/eduserver/cache" @@ -47,6 +48,7 @@ type Database struct { BackwardExtremities tables.BackwardsExtremities SendToDevice tables.SendToDevice Filter tables.Filter + Receipts tables.Receipts EDUCache *cache.EDUCache } @@ -527,10 +529,10 @@ func (d *Database) addTypingDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - var jr types.JoinResponse var ok bool var err error for _, roomID := range joinedRoomIDs { + var jr types.JoinResponse if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUPosition()), ); updated { @@ -554,21 +556,84 @@ func (d *Database) addTypingDeltaToResponse( return nil } +// addReceiptDeltaToResponse adds all receipt information to a sync response +// since the specified position +func (d *Database) addReceiptDeltaToResponse( + since types.StreamingToken, + joinedRoomIDs []string, + res *types.Response, +) error { + receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition()) + if err != nil { + return fmt.Errorf("unable to select receipts for rooms: %w", err) + } + + // Group receipts by room, so we can create one ClientEvent for every room + receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent) + for _, receipt := range receipts { + receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) + } + + for roomID, receipts := range receiptsByRoom { + var jr types.JoinResponse + var ok bool + + // Make sure we use an existing JoinResponse if there is one. + // If not, we'll create a new one + if jr, ok = res.Rooms.Join[roomID]; !ok { + jr = types.JoinResponse{} + } + + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MReceipt, + RoomID: roomID, + } + content := make(map[string]eduAPI.ReceiptMRead) + for _, receipt := range receipts { + var read eduAPI.ReceiptMRead + if read, ok = content[receipt.EventID]; !ok { + read = eduAPI.ReceiptMRead{ + User: make(map[string]eduAPI.ReceiptTS), + } + } + read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} + content[receipt.EventID] = read + } + ev.Content, err = json.Marshal(content) + if err != nil { + return err + } + + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.Rooms.Join[roomID] = jr + } + + return nil +} + // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if // the positions of that type are not equal in fromPos and toPos. func (d *Database) addEDUDeltaToResponse( fromPos, toPos types.StreamingToken, joinedRoomIDs []string, res *types.Response, -) (err error) { - +) error { if fromPos.EDUPosition() != toPos.EDUPosition() { - err = d.addTypingDeltaToResponse( - fromPos, joinedRoomIDs, res, - ) + // add typing deltas + if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { + return fmt.Errorf("unable to apply typing delta to response: %w", err) + } } - return + // Check on initial sync and if EDUPositions differ + if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) || + fromPos.EDUPosition() != toPos.EDUPosition() { + if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { + return fmt.Errorf("unable to apply receipts to response: %w", err) + } + } + + return nil } func (d *Database) GetFilter( @@ -1404,3 +1469,16 @@ type stateDelta struct { // Can be 0 if there is no membership event in this delta. membershipPos types.StreamPosition } + +// StoreReceipt stores user receipts +func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + pos, err = d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp) + return err + }) + return +} + +func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { + return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) +} |