From bcb89ada5ebbe54fa057ec403af4074a8c147764 Mon Sep 17 00:00:00 2001 From: S7evinK Date: Mon, 9 Nov 2020 19:46:11 +0100 Subject: Implement read receipts (#1528) * fix conversion from int to string yields a string of one rune, not a string of digits * Add receipts table to syncapi * Use StreamingToken as the since value * Add required method to testEDUProducer * Make receipt json creation "easier" to read * Add receipts api to the eduserver * Add receipts endpoint * Add eduserver kafka consumer * Add missing kafka config * Add passing tests to whitelist Signed-off-by: Till Faelligen * Fix copy & paste error * Fix column count error * Make outbound federation receipts pass * Make "Inbound federation rejects receipts from wrong remote" pass * Don't use errors package * - Add TODO for batching requests - Rename variable * Return a better error message * - Use OutputReceiptEvent instead of InputReceiptEvent as result - Don't use the errors package for errors - Defer CloseAndLogIfError to close rows - Fix Copyright * Better creation/usage of JoinResponse * Query all joined rooms instead of just one * Update gomatrixserverlib * Add sqlite3 migration * Add postgres migration * Ensure required sequence exists before running migrations * Clarification on comment * - Fix a bug when creating client receipts - Use concrete types instead of interface{} * Remove dead code Use key for timestamp * Fix postgres query... * Remove single purpose struct * Use key/value directly * Only apply receipts on initial sync or if edu positions differ, otherwise we'll be sending the same receipts over and over again. * Actually update the id, so it is correctly send in syncs * Set receipt on request to /read_markers * Fix issue with receipts getting overwritten * Use fmt.Errorf instead of pkg/errors * Revert "Add postgres migration" This reverts commit 722fe5a04628882b787d096942459961db159b06. * Revert "Add sqlite3 migration" This reverts commit d113b03f6495a4b8f8bcf158a3d00b510b4240cc. * Fix selectRoomReceipts query * Make golangci-lint happy Co-authored-by: Neil Alexander --- syncapi/storage/postgres/receipt_table.go | 106 ++++++++++++++++++++++++++++++ syncapi/storage/postgres/syncserver.go | 5 ++ 2 files changed, 111 insertions(+) create mode 100644 syncapi/storage/postgres/receipt_table.go (limited to 'syncapi/storage/postgres') diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go new file mode 100644 index 00000000..c5ec6cbc --- /dev/null +++ b/syncapi/storage/postgres/receipt_table.go @@ -0,0 +1,106 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "fmt" + + "github.com/lib/pq" + + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const receiptsSchema = ` +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS syncapi_receipts ( + -- The ID + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id); +` + +const upsertReceipt = "" + + "INSERT INTO syncapi_receipts" + + " (room_id, receipt_type, user_id, event_id, receipt_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (room_id, receipt_type, user_id)" + + " DO UPDATE SET id = nextval('syncapi_stream_id'), event_id = $4, receipt_ts = $5" + + " RETURNING id" + +const selectRoomReceipts = "" + + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + + " FROM syncapi_receipts" + + " WHERE room_id = ANY($1) AND id > $2" + +type receiptStatements struct { + db *sql.DB + upsertReceipt *sql.Stmt + selectRoomReceipts *sql.Stmt +} + +func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { + _, err := db.Exec(receiptsSchema) + if err != nil { + return nil, err + } + r := &receiptStatements{ + db: db, + } + if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { + return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err) + } + if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { + return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) + } + return r, nil +} + +func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, r.upsertReceipt) + err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos) + return +} + +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { + rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) + if err != nil { + return nil, fmt.Errorf("unable to query room receipts: %w", err) + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") + var res []api.OutputReceiptEvent + for rows.Next() { + r := api.OutputReceiptEvent{} + err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) + if err != nil { + return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) + } + res = append(res, r) + } + return res, rows.Err() +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 7f19722a..979e19a0 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -82,6 +82,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if err != nil { return nil, err } + receipts, err := NewPostgresReceiptsTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, @@ -94,6 +98,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e BackwardExtremities: backwardExtremities, Filter: filter, SendToDevice: sendToDevice, + Receipts: receipts, EDUCache: cache.New(), } return &d, nil -- cgit v1.2.3