aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2020-11-09 19:46:11 +0100
committerGitHub <noreply@github.com>2020-11-09 18:46:11 +0000
commitbcb89ada5ebbe54fa057ec403af4074a8c147764 (patch)
tree283cdbaf04db7fe5bcd25185b40f77c60aa928ca /syncapi
parenteccd0d2c1b8bd4b921bafca4585aa09d32ae561f (diff)
Implement read receipts (#1528)
* fix conversion from int to string yields a string of one rune, not a string of digits * Add receipts table to syncapi * Use StreamingToken as the since value * Add required method to testEDUProducer * Make receipt json creation "easier" to read * Add receipts api to the eduserver * Add receipts endpoint * Add eduserver kafka consumer * Add missing kafka config * Add passing tests to whitelist Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Fix copy & paste error * Fix column count error * Make outbound federation receipts pass * Make "Inbound federation rejects receipts from wrong remote" pass * Don't use errors package * - Add TODO for batching requests - Rename variable * Return a better error message * - Use OutputReceiptEvent instead of InputReceiptEvent as result - Don't use the errors package for errors - Defer CloseAndLogIfError to close rows - Fix Copyright * Better creation/usage of JoinResponse * Query all joined rooms instead of just one * Update gomatrixserverlib * Add sqlite3 migration * Add postgres migration * Ensure required sequence exists before running migrations * Clarification on comment * - Fix a bug when creating client receipts - Use concrete types instead of interface{} * Remove dead code Use key for timestamp * Fix postgres query... * Remove single purpose struct * Use key/value directly * Only apply receipts on initial sync or if edu positions differ, otherwise we'll be sending the same receipts over and over again. * Actually update the id, so it is correctly send in syncs * Set receipt on request to /read_markers * Fix issue with receipts getting overwritten * Use fmt.Errorf instead of pkg/errors * Revert "Add postgres migration" This reverts commit 722fe5a04628882b787d096942459961db159b06. * Revert "Add sqlite3 migration" This reverts commit d113b03f6495a4b8f8bcf158a3d00b510b4240cc. * Fix selectRoomReceipts query * Make golangci-lint happy Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/eduserver_receipts.go94
-rw-r--r--syncapi/storage/interface.go6
-rw-r--r--syncapi/storage/postgres/receipt_table.go106
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go92
-rw-r--r--syncapi/storage/sqlite3/receipt_table.go118
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go6
-rw-r--r--syncapi/sync/notifier.go10
-rw-r--r--syncapi/syncapi.go7
10 files changed, 442 insertions, 7 deletions
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
new file mode 100644
index 00000000..c5d17414
--- /dev/null
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -0,0 +1,94 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputReceiptEventConsumer consumes events that originated in the EDU server.
+type OutputReceiptEventConsumer struct {
+ receiptConsumer *internal.ContinualConsumer
+ db storage.Database
+ notifier *sync.Notifier
+}
+
+// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
+// Call Start() to begin consuming from the EDU server.
+func NewOutputReceiptEventConsumer(
+ cfg *config.SyncAPI,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store storage.Database,
+) *OutputReceiptEventConsumer {
+
+ consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/eduserver/receipt",
+ Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+
+ s := &OutputReceiptEventConsumer{
+ receiptConsumer: &consumer,
+ db: store,
+ notifier: n,
+ }
+
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from EDU api
+func (s *OutputReceiptEventConsumer) Start() error {
+ return s.receiptConsumer.Start()
+}
+
+func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ var output api.OutputReceiptEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("EDU server output log: message parse failure")
+ return nil
+ }
+
+ streamPos, err := s.db.StoreReceipt(
+ context.TODO(),
+ output.RoomID,
+ output.Type,
+ output.UserID,
+ output.EventID,
+ output.Timestamp,
+ )
+ if err != nil {
+ return err
+ }
+ // update stream position
+ s.notifier.OnNewReceipt(types.NewStreamToken(0, streamPos, nil))
+
+ return nil
+}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index e12a1166..727cc048 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -18,6 +18,8 @@ import (
"context"
"time"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
+
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -147,4 +149,8 @@ type Database interface {
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
+ // StoreReceipt stores new receipt events
+ StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
+ // GetRoomReceipts gets all receipts for a given roomID
+ GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
}
diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go
new file mode 100644
index 00000000..c5ec6cbc
--- /dev/null
+++ b/syncapi/storage/postgres/receipt_table.go
@@ -0,0 +1,106 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/lib/pq"
+
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const receiptsSchema = `
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+-- Stores data about receipts
+CREATE TABLE IF NOT EXISTS syncapi_receipts (
+ -- The ID
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ receipt_ts BIGINT NOT NULL,
+ CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id);
+`
+
+const upsertReceipt = "" +
+ "INSERT INTO syncapi_receipts" +
+ " (room_id, receipt_type, user_id, event_id, receipt_ts)" +
+ " VALUES ($1, $2, $3, $4, $5)" +
+ " ON CONFLICT (room_id, receipt_type, user_id)" +
+ " DO UPDATE SET id = nextval('syncapi_stream_id'), event_id = $4, receipt_ts = $5" +
+ " RETURNING id"
+
+const selectRoomReceipts = "" +
+ "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
+ " FROM syncapi_receipts" +
+ " WHERE room_id = ANY($1) AND id > $2"
+
+type receiptStatements struct {
+ db *sql.DB
+ upsertReceipt *sql.Stmt
+ selectRoomReceipts *sql.Stmt
+}
+
+func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
+ _, err := db.Exec(receiptsSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &receiptStatements{
+ db: db,
+ }
+ if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
+ return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
+ }
+ if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
+ return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
+ }
+ return r, nil
+}
+
+func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
+ err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos)
+ return
+}
+
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
+ rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
+ if err != nil {
+ return nil, fmt.Errorf("unable to query room receipts: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
+ var res []api.OutputReceiptEvent
+ for rows.Next() {
+ r := api.OutputReceiptEvent{}
+ err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
+ if err != nil {
+ return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
+ }
+ res = append(res, r)
+ }
+ return res, rows.Err()
+}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 7f19722a..979e19a0 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -82,6 +82,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
+ receipts, err := NewPostgresReceiptsTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
@@ -94,6 +98,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
BackwardExtremities: backwardExtremities,
Filter: filter,
SendToDevice: sendToDevice,
+ Receipts: receipts,
EDUCache: cache.New(),
}
return &d, nil
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index a7c07f94..2b82ee33 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -21,6 +21,7 @@ import (
"fmt"
"time"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -47,6 +48,7 @@ type Database struct {
BackwardExtremities tables.BackwardsExtremities
SendToDevice tables.SendToDevice
Filter tables.Filter
+ Receipts tables.Receipts
EDUCache *cache.EDUCache
}
@@ -527,10 +529,10 @@ func (d *Database) addTypingDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- var jr types.JoinResponse
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
+ var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
); updated {
@@ -554,21 +556,84 @@ func (d *Database) addTypingDeltaToResponse(
return nil
}
+// addReceiptDeltaToResponse adds all receipt information to a sync response
+// since the specified position
+func (d *Database) addReceiptDeltaToResponse(
+ since types.StreamingToken,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition())
+ if err != nil {
+ return fmt.Errorf("unable to select receipts for rooms: %w", err)
+ }
+
+ // Group receipts by room, so we can create one ClientEvent for every room
+ receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
+ for _, receipt := range receipts {
+ receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
+ }
+
+ for roomID, receipts := range receiptsByRoom {
+ var jr types.JoinResponse
+ var ok bool
+
+ // Make sure we use an existing JoinResponse if there is one.
+ // If not, we'll create a new one
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = types.JoinResponse{}
+ }
+
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MReceipt,
+ RoomID: roomID,
+ }
+ content := make(map[string]eduAPI.ReceiptMRead)
+ for _, receipt := range receipts {
+ var read eduAPI.ReceiptMRead
+ if read, ok = content[receipt.EventID]; !ok {
+ read = eduAPI.ReceiptMRead{
+ User: make(map[string]eduAPI.ReceiptTS),
+ }
+ }
+ read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
+ content[receipt.EventID] = read
+ }
+ ev.Content, err = json.Marshal(content)
+ if err != nil {
+ return err
+ }
+
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+
+ return nil
+}
+
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
-) (err error) {
-
+) error {
if fromPos.EDUPosition() != toPos.EDUPosition() {
- err = d.addTypingDeltaToResponse(
- fromPos, joinedRoomIDs, res,
- )
+ // add typing deltas
+ if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply typing delta to response: %w", err)
+ }
}
- return
+ // Check on initial sync and if EDUPositions differ
+ if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) ||
+ fromPos.EDUPosition() != toPos.EDUPosition() {
+ if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply receipts to response: %w", err)
+ }
+ }
+
+ return nil
}
func (d *Database) GetFilter(
@@ -1404,3 +1469,16 @@ type stateDelta struct {
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
+
+// StoreReceipt stores user receipts
+func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ pos, err = d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp)
+ return err
+ })
+ return
+}
+
+func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
+ return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
+}
diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go
new file mode 100644
index 00000000..b1770e80
--- /dev/null
+++ b/syncapi/storage/sqlite3/receipt_table.go
@@ -0,0 +1,118 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const receiptsSchema = `
+-- Stores data about receipts
+CREATE TABLE IF NOT EXISTS syncapi_receipts (
+ -- The ID
+ id BIGINT,
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ receipt_ts BIGINT NOT NULL,
+ CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id);
+`
+
+const upsertReceipt = "" +
+ "INSERT INTO syncapi_receipts" +
+ " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT (room_id, receipt_type, user_id)" +
+ " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9"
+
+const selectRoomReceipts = "" +
+ "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
+ " FROM syncapi_receipts" +
+ " WHERE id > $1 and room_id in ($2)"
+
+type receiptStatements struct {
+ db *sql.DB
+ streamIDStatements *streamIDStatements
+ upsertReceipt *sql.Stmt
+ selectRoomReceipts *sql.Stmt
+}
+
+func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
+ _, err := db.Exec(receiptsSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &receiptStatements{
+ db: db,
+ streamIDStatements: streamID,
+ }
+ if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
+ return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
+ }
+ if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
+ return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
+ }
+ return r, nil
+}
+
+// UpsertReceipt creates new user receipts
+func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ pos, err = r.streamIDStatements.nextStreamID(ctx, txn)
+ if err != nil {
+ return
+ }
+ stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
+ _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp, pos, eventId, timestamp)
+ return
+}
+
+// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
+ selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
+
+ params := make([]interface{}, len(roomIDs)+1)
+ params[0] = streamPos
+ for k, v := range roomIDs {
+ params[k+1] = v
+ }
+ rows, err := r.db.QueryContext(ctx, selectSQL, params...)
+ if err != nil {
+ return nil, fmt.Errorf("unable to query room receipts: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
+ var res []api.OutputReceiptEvent
+ for rows.Next() {
+ r := api.OutputReceiptEvent{}
+ err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
+ if err != nil {
+ return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
+ }
+ res = append(res, r)
+ }
+ return res, rows.Err()
+}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 86d83ec9..036e2b2e 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -95,6 +95,10 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
+ receipts, err := NewSqliteReceiptsTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
@@ -107,6 +111,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
Topology: topology,
Filter: filter,
SendToDevice: sendToDevice,
+ Receipts: receipts,
EDUCache: cache.New(),
}
return nil
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index da095be5..f8e7a224 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -156,3 +157,8 @@ type Filter interface {
SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error)
InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error)
}
+
+type Receipts interface {
+ UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
+ SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
+}
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index fcac3f16..daa3a1d8 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -149,6 +149,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos)
}
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewReceipt(
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
+}
+
func (n *Notifier) OnNewKeyChange(
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
) {
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index de0bb434..393a7aa5 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -99,5 +99,12 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
+ receiptConsumer := consumers.NewOutputReceiptEventConsumer(
+ cfg, consumer, notifier, syncDB,
+ )
+ if err = receiptConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start receipts consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}