aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/sqlite3
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-06 13:11:19 +0200
committerGitHub <noreply@github.com>2022-04-06 13:11:19 +0200
commite5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch)
tree738e3bd364da85767de8c487d3d9a851ab804e8b /syncapi/storage/sqlite3
parent16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff)
Add presence module V2 (#2312)
* Syncapi presence * Clientapi http presence handler * Why is this here? * Missing files * FederationAPI presence implementation * Add new presence stream * Pinecone update * Pinecone update * Add passing tests * Make linter happy * Add presence producer * Add presence config option * Set user to unavailable after x minutes * Only set currently_active if online Avoid unneeded presence updates when syncing * Tweaks * Query devices for last_active_ts Fixes & tweaks * Export SharedUsers/SharedUsers * Presence stream in MemoryStorage * Remove status_msg_nil * Fix sytest crashes * Make presence types const and use stringer for it * Change options to allow inbound/outbound presence * Fix option & typo * Update configs Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/storage/sqlite3')
-rw-r--r--syncapi/storage/sqlite3/presence_table.go177
-rw-r--r--syncapi/storage/sqlite3/stream_id_table.go8
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
3 files changed, 190 insertions, 0 deletions
diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go
new file mode 100644
index 00000000..e7b78a70
--- /dev/null
+++ b/syncapi/storage/sqlite3/presence_table.go
@@ -0,0 +1,177 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const presenceSchema = `
+-- Stores data about presence
+CREATE TABLE IF NOT EXISTS syncapi_presence (
+ -- The ID
+ id BIGINT NOT NULL,
+ -- The Matrix user ID
+ user_id TEXT NOT NULL,
+ -- The actual presence
+ presence INT NOT NULL,
+ -- The status message
+ status_msg TEXT,
+ -- The last time an action was received by this user
+ last_active_ts BIGINT NOT NULL,
+ CONSTRAINT presence_presences_unique UNIQUE (user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
+`
+
+const upsertPresenceSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (id, user_id, presence, status_msg, last_active_ts)" +
+ " VALUES ($1, $2, $3, $4, $5)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = $6, " +
+ " presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" +
+ " RETURNING id"
+
+const upsertPresenceFromSyncSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (id, user_id, presence, last_active_ts)" +
+ " VALUES ($1, $2, $3, $4)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = $5, " +
+ " presence = $6, last_active_ts = $7" +
+ " RETURNING id"
+
+const selectPresenceForUserSQL = "" +
+ "SELECT presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE user_id = $1 LIMIT 1"
+
+const selectMaxPresenceSQL = "" +
+ "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
+
+const selectPresenceAfter = "" +
+ " SELECT id, user_id, presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE id > $1"
+
+type presenceStatements struct {
+ db *sql.DB
+ streamIDStatements *streamIDStatements
+ upsertPresenceStmt *sql.Stmt
+ upsertPresenceFromSyncStmt *sql.Stmt
+ selectPresenceForUsersStmt *sql.Stmt
+ selectMaxPresenceStmt *sql.Stmt
+ selectPresenceAfterStmt *sql.Stmt
+}
+
+func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) {
+ _, err := db.Exec(presenceSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &presenceStatements{
+ db: db,
+ streamIDStatements: streamID,
+ }
+ return s, sqlutil.StatementList{
+ {&s.upsertPresenceStmt, upsertPresenceSQL},
+ {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
+ {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
+ {&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
+ {&s.selectPresenceAfterStmt, selectPresenceAfter},
+ }.Prepare(db)
+}
+
+// UpsertPresence creates/updates a presence status.
+func (p *presenceStatements) UpsertPresence(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ statusMsg *string,
+ presence types.Presence,
+ lastActiveTS gomatrixserverlib.Timestamp,
+ fromSync bool,
+) (pos types.StreamPosition, err error) {
+ pos, err = p.streamIDStatements.nextPresenceID(ctx, txn)
+ if err != nil {
+ return pos, err
+ }
+
+ if fromSync {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
+ err = stmt.QueryRowContext(ctx,
+ pos, userID, presence,
+ lastActiveTS, pos,
+ presence, lastActiveTS).Scan(&pos)
+ } else {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
+ err = stmt.QueryRowContext(ctx,
+ pos, userID, presence,
+ statusMsg, lastActiveTS, pos,
+ presence, statusMsg, lastActiveTS).Scan(&pos)
+ }
+ return
+}
+
+// GetPresenceForUser returns the current presence of a user.
+func (p *presenceStatements) GetPresenceForUser(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+) (*types.PresenceInternal, error) {
+ result := &types.PresenceInternal{
+ UserID: userID,
+ }
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
+ err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
+ result.ClientFields.Presence = result.Presence.String()
+ return result, err
+}
+
+func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&pos)
+ return
+}
+
+// GetPresenceAfter returns the changes presences after a given stream id
+func (p *presenceStatements) GetPresenceAfter(
+ ctx context.Context, txn *sql.Tx,
+ after types.StreamPosition,
+) (presences map[string]*types.PresenceInternal, err error) {
+ presences = make(map[string]*types.PresenceInternal)
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
+
+ rows, err := stmt.QueryContext(ctx, after)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
+ for rows.Next() {
+ qryRes := &types.PresenceInternal{}
+ if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
+ return nil, err
+ }
+ qryRes.ClientFields.Presence = qryRes.Presence.String()
+ presences[qryRes.UserID] = qryRes
+ }
+ return presences, rows.Err()
+}
diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go
index 2be3ae93..faa2c41f 100644
--- a/syncapi/storage/sqlite3/stream_id_table.go
+++ b/syncapi/storage/sqlite3/stream_id_table.go
@@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
+INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
+ ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx)
err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}
+
+func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
+ err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
+ return
+}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index cb7e3b46..cb2c3169 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
+ presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
+ Presence: presence,
}
return nil
}