aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/postgres/memberships_table.go111
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go9
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go11
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go119
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go7
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go7
8 files changed, 262 insertions, 12 deletions
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
new file mode 100644
index 00000000..6566544d
--- /dev/null
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -0,0 +1,111 @@
+// Copyright 2021 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// The memberships table is designed to track the last time that
+// the user was a given state. This allows us to find out the
+// most recent time that a user was invited to, joined or left
+// a room, either by choice or otherwise. This is important for
+// building history visibility.
+
+const membershipsSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_memberships (
+ -- The 'room_id' key for the state event.
+ room_id TEXT NOT NULL,
+ -- The state event ID
+ user_id TEXT NOT NULL,
+ -- The status of the membership
+ membership TEXT NOT NULL,
+ -- The event ID that last changed the membership
+ event_id TEXT NOT NULL,
+ -- The stream position of the change
+ stream_pos BIGINT NOT NULL,
+ -- The topological position of the change in the room
+ topological_pos BIGINT NOT NULL,
+ -- Unique index
+ CONSTRAINT syncapi_memberships_unique UNIQUE (room_id, user_id, membership)
+);
+`
+
+const upsertMembershipSQL = "" +
+ "INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
+ " DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
+
+const selectMembershipSQL = "" +
+ "SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
+ " WHERE room_id = $1 AND user_id = $2 AND membership = ANY($3)" +
+ " ORDER BY stream_pos DESC" +
+ " LIMIT 1"
+
+type membershipsStatements struct {
+ upsertMembershipStmt *sql.Stmt
+ selectMembershipStmt *sql.Stmt
+}
+
+func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
+ s := &membershipsStatements{}
+ _, err := db.Exec(membershipsSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
+ return nil, err
+ }
+ if s.selectMembershipStmt, err = db.Prepare(selectMembershipSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *membershipsStatements) UpsertMembership(
+ ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ streamPos, topologicalPos types.StreamPosition,
+) error {
+ membership, err := event.Membership()
+ if err != nil {
+ return fmt.Errorf("event.Membership: %w", err)
+ }
+ _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
+ ctx,
+ event.RoomID(),
+ *event.StateKey(),
+ membership,
+ event.EventID(),
+ streamPos,
+ topologicalPos,
+ )
+ return err
+}
+
+func (s *membershipsStatements) SelectMembership(
+ ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
+) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, s.selectMembershipStmt)
+ err = stmt.QueryRowContext(ctx, roomID, userID, memberships).Scan(&eventID, &streamPos, &topologyPos)
+ return
+}
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index cbd20a07..57774453 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
const insertEventInTopologySQL = "" +
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3, $4)" +
- " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
+ " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
+ " RETURNING topological_position"
const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
@@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
-) (err error) {
- _, err = s.insertEventInTopologyStmt.ExecContext(
+) (topoPos types.StreamPosition, err error) {
+ err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
- )
+ ).Scan(&topoPos)
return
}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 0fbf3c23..a69fda4f 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
+ memberships, err := NewPostgresMembershipsTable(d.db)
+ if err != nil {
+ return nil, err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
+ Memberships: memberships,
}
return &d, nil
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 9df07693..239f6812 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -48,6 +48,7 @@ type Database struct {
SendToDevice tables.SendToDevice
Filter tables.Filter
Receipts tables.Receipts
+ Memberships tables.Memberships
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
}
pduPosition = pos
-
- if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
+ var topoPosition types.StreamPosition
+ if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
}
@@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
return nil
}
- return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
+ return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
})
return pduPosition, returnErr
@@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
removedEventIDs []string,
addedEvents []*gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition,
+ topoPosition types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs {
@@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
return fmt.Errorf("event.Membership: %w", err)
}
membership = &value
+ if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
+ return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
+ }
}
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
new file mode 100644
index 00000000..e5445e81
--- /dev/null
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -0,0 +1,119 @@
+// Copyright 2021 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// The memberships table is designed to track the last time that
+// the user was a given state. This allows us to find out the
+// most recent time that a user was invited to, joined or left
+// a room, either by choice or otherwise. This is important for
+// building history visibility.
+
+const membershipsSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_memberships (
+ -- The 'room_id' key for the state event.
+ room_id TEXT NOT NULL,
+ -- The state event ID
+ user_id TEXT NOT NULL,
+ -- The status of the membership
+ membership TEXT NOT NULL,
+ -- The event ID that last changed the membership
+ event_id TEXT NOT NULL,
+ -- The stream position of the change
+ stream_pos BIGINT NOT NULL,
+ -- The topological position of the change in the room
+ topological_pos BIGINT NOT NULL,
+ -- Unique index
+ UNIQUE (room_id, user_id, membership)
+);
+`
+
+const upsertMembershipSQL = "" +
+ "INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT (room_id, user_id, membership)" +
+ " DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
+
+const selectMembershipSQL = "" +
+ "SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
+ " WHERE room_id = $1 AND user_id = $2 AND membership IN ($3)" +
+ " ORDER BY stream_pos DESC" +
+ " LIMIT 1"
+
+type membershipsStatements struct {
+ db *sql.DB
+ upsertMembershipStmt *sql.Stmt
+}
+
+func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
+ s := &membershipsStatements{
+ db: db,
+ }
+ _, err := db.Exec(membershipsSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *membershipsStatements) UpsertMembership(
+ ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ streamPos, topologicalPos types.StreamPosition,
+) error {
+ membership, err := event.Membership()
+ if err != nil {
+ return fmt.Errorf("event.Membership: %w", err)
+ }
+ _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
+ ctx,
+ event.RoomID(),
+ *event.StateKey(),
+ membership,
+ event.EventID(),
+ streamPos,
+ topologicalPos,
+ )
+ return err
+}
+
+func (s *membershipsStatements) SelectMembership(
+ ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
+) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
+ params := []interface{}{roomID, userID}
+ for _, membership := range memberships {
+ params = append(params, membership)
+ }
+ orig := strings.Replace(selectMembershipSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
+ stmt, err := s.db.Prepare(orig)
+ if err != nil {
+ return "", 0, 0, err
+ }
+ err = sqlutil.TxStmt(txn, stmt).QueryRowContext(ctx, params...).Scan(&eventID, &streamPos, &topologyPos)
+ return
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index d3ba9af6..d34b9050 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
-) (err error) {
- stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt)
- _, err = stmt.ExecContext(
+) (types.StreamPosition, error) {
+ _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
)
- return
+ return types.StreamPosition(event.Depth()), err
}
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index fdb6ce4f..b0e43b68 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
+ memberships, err := NewSqliteMembershipsTable(d.db)
+ if err != nil {
+ return err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
+ Memberships: memberships,
}
return nil
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 73967677..997486dd 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -70,7 +70,7 @@ type Events interface {
type Topology interface {
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
- InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
+ InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
@@ -162,3 +162,8 @@ type Receipts interface {
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
+
+type Memberships interface {
+ UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
+ SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
+}