diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-06 13:11:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-06 13:11:19 +0200 |
commit | e5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch) | |
tree | 738e3bd364da85767de8c487d3d9a851ab804e8b /syncapi/storage/sqlite3 | |
parent | 16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff) |
Add presence module V2 (#2312)
* Syncapi presence
* Clientapi http presence handler
* Why is this here?
* Missing files
* FederationAPI presence implementation
* Add new presence stream
* Pinecone update
* Pinecone update
* Add passing tests
* Make linter happy
* Add presence producer
* Add presence config option
* Set user to unavailable after x minutes
* Only set currently_active if online
Avoid unneeded presence updates when syncing
* Tweaks
* Query devices for last_active_ts
Fixes & tweaks
* Export SharedUsers/SharedUsers
* Presence stream in MemoryStorage
* Remove status_msg_nil
* Fix sytest crashes
* Make presence types const and use stringer for it
* Change options to allow inbound/outbound presence
* Fix option & typo
* Update configs
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/storage/sqlite3')
-rw-r--r-- | syncapi/storage/sqlite3/presence_table.go | 177 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/stream_id_table.go | 8 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 5 |
3 files changed, 190 insertions, 0 deletions
diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go new file mode 100644 index 00000000..e7b78a70 --- /dev/null +++ b/syncapi/storage/sqlite3/presence_table.go @@ -0,0 +1,177 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const presenceSchema = ` +-- Stores data about presence +CREATE TABLE IF NOT EXISTS syncapi_presence ( + -- The ID + id BIGINT NOT NULL, + -- The Matrix user ID + user_id TEXT NOT NULL, + -- The actual presence + presence INT NOT NULL, + -- The status message + status_msg TEXT, + -- The last time an action was received by this user + last_active_ts BIGINT NOT NULL, + CONSTRAINT presence_presences_unique UNIQUE (user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id); +` + +const upsertPresenceSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, status_msg, last_active_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $6, " + + " presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" + + " RETURNING id" + +const upsertPresenceFromSyncSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, last_active_ts)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $5, " + + " presence = $6, last_active_ts = $7" + + " RETURNING id" + +const selectPresenceForUserSQL = "" + + "SELECT presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE user_id = $1 LIMIT 1" + +const selectMaxPresenceSQL = "" + + "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence" + +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE id > $1" + +type presenceStatements struct { + db *sql.DB + streamIDStatements *streamIDStatements + upsertPresenceStmt *sql.Stmt + upsertPresenceFromSyncStmt *sql.Stmt + selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt +} + +func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) { + _, err := db.Exec(presenceSchema) + if err != nil { + return nil, err + } + s := &presenceStatements{ + db: db, + streamIDStatements: streamID, + } + return s, sqlutil.StatementList{ + {&s.upsertPresenceStmt, upsertPresenceSQL}, + {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL}, + {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, + {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, + {&s.selectPresenceAfterStmt, selectPresenceAfter}, + }.Prepare(db) +} + +// UpsertPresence creates/updates a presence status. +func (p *presenceStatements) UpsertPresence( + ctx context.Context, + txn *sql.Tx, + userID string, + statusMsg *string, + presence types.Presence, + lastActiveTS gomatrixserverlib.Timestamp, + fromSync bool, +) (pos types.StreamPosition, err error) { + pos, err = p.streamIDStatements.nextPresenceID(ctx, txn) + if err != nil { + return pos, err + } + + if fromSync { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presence, + lastActiveTS, pos, + presence, lastActiveTS).Scan(&pos) + } else { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presence, + statusMsg, lastActiveTS, pos, + presence, statusMsg, lastActiveTS).Scan(&pos) + } + return +} + +// GetPresenceForUser returns the current presence of a user. +func (p *presenceStatements) GetPresenceForUser( + ctx context.Context, txn *sql.Tx, + userID string, +) (*types.PresenceInternal, error) { + result := &types.PresenceInternal{ + UserID: userID, + } + stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) + err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + result.ClientFields.Presence = result.Presence.String() + return result, err +} + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after types.StreamPosition, +) (presences map[string]*types.PresenceInternal, err error) { + presences = make(map[string]*types.PresenceInternal) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + for rows.Next() { + qryRes := &types.PresenceInternal{} + if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { + return nil, err + } + qryRes.ClientFields.Presence = qryRes.Presence.String() + presences[qryRes.UserID] = qryRes + } + return presences, rows.Err() +} diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index 2be3ae93..faa2c41f 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0) ON CONFLICT DO NOTHING; INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0) ON CONFLICT DO NOTHING; +INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) + ON CONFLICT DO NOTHING; ` const increaseStreamIDStmt = "" + @@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos) return } + +func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt) + err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index cb7e3b46..cb2c3169 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } + presence, err := NewSqlitePresenceTable(d.db, &d.streamID) + if err != nil { + return err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er Receipts: receipts, Memberships: memberships, NotificationData: notificationData, + Presence: presence, } return nil } |