aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go92
1 files changed, 85 insertions, 7 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index a7c07f94..2b82ee33 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -21,6 +21,7 @@ import (
"fmt"
"time"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -47,6 +48,7 @@ type Database struct {
BackwardExtremities tables.BackwardsExtremities
SendToDevice tables.SendToDevice
Filter tables.Filter
+ Receipts tables.Receipts
EDUCache *cache.EDUCache
}
@@ -527,10 +529,10 @@ func (d *Database) addTypingDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- var jr types.JoinResponse
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
+ var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
); updated {
@@ -554,21 +556,84 @@ func (d *Database) addTypingDeltaToResponse(
return nil
}
+// addReceiptDeltaToResponse adds all receipt information to a sync response
+// since the specified position
+func (d *Database) addReceiptDeltaToResponse(
+ since types.StreamingToken,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition())
+ if err != nil {
+ return fmt.Errorf("unable to select receipts for rooms: %w", err)
+ }
+
+ // Group receipts by room, so we can create one ClientEvent for every room
+ receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
+ for _, receipt := range receipts {
+ receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
+ }
+
+ for roomID, receipts := range receiptsByRoom {
+ var jr types.JoinResponse
+ var ok bool
+
+ // Make sure we use an existing JoinResponse if there is one.
+ // If not, we'll create a new one
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = types.JoinResponse{}
+ }
+
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MReceipt,
+ RoomID: roomID,
+ }
+ content := make(map[string]eduAPI.ReceiptMRead)
+ for _, receipt := range receipts {
+ var read eduAPI.ReceiptMRead
+ if read, ok = content[receipt.EventID]; !ok {
+ read = eduAPI.ReceiptMRead{
+ User: make(map[string]eduAPI.ReceiptTS),
+ }
+ }
+ read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
+ content[receipt.EventID] = read
+ }
+ ev.Content, err = json.Marshal(content)
+ if err != nil {
+ return err
+ }
+
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+
+ return nil
+}
+
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
-) (err error) {
-
+) error {
if fromPos.EDUPosition() != toPos.EDUPosition() {
- err = d.addTypingDeltaToResponse(
- fromPos, joinedRoomIDs, res,
- )
+ // add typing deltas
+ if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply typing delta to response: %w", err)
+ }
}
- return
+ // Check on initial sync and if EDUPositions differ
+ if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) ||
+ fromPos.EDUPosition() != toPos.EDUPosition() {
+ if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply receipts to response: %w", err)
+ }
+ }
+
+ return nil
}
func (d *Database) GetFilter(
@@ -1404,3 +1469,16 @@ type stateDelta struct {
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
+
+// StoreReceipt stores user receipts
+func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ pos, err = d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp)
+ return err
+ })
+ return
+}
+
+func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
+ return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
+}