diff options
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r-- | syncapi/storage/postgres/account_data_table.go | 149 | ||||
-rw-r--r-- | syncapi/storage/postgres/current_room_state_table.go | 285 | ||||
-rw-r--r-- | syncapi/storage/postgres/filtering.go | 36 | ||||
-rw-r--r-- | syncapi/storage/postgres/invites_table.go | 148 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 328 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 948 |
6 files changed, 1894 insertions, 0 deletions
diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go new file mode 100644 index 00000000..33cfffad --- /dev/null +++ b/syncapi/storage/postgres/account_data_table.go @@ -0,0 +1,149 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const accountDataSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores the types of account data that a user set has globally and in each room +-- and the stream ID when that type was last updated. +CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( + -- An incrementing ID which denotes the position in the log that this event resides at. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- ID of the user the data belongs to + user_id TEXT NOT NULL, + -- ID of the room the data is related to (empty string if not related to a specific room) + room_id TEXT NOT NULL, + -- Type of the data + type TEXT NOT NULL, + + -- We don't want two entries of the same type for the same user + CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) +); + +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type); +` + +const insertAccountDataSQL = "" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + + " DO UPDATE SET id = EXCLUDED.id" + + " RETURNING id" + +const selectAccountDataInRangeSQL = "" + + "SELECT room_id, type FROM syncapi_account_data_type" + + " WHERE user_id = $1 AND id > $2 AND id <= $3" + + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + + " ORDER BY id ASC LIMIT $6" + +const selectMaxAccountDataIDSQL = "" + + "SELECT MAX(id) FROM syncapi_account_data_type" + +type accountDataStatements struct { + insertAccountDataStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt + selectMaxAccountDataIDStmt *sql.Stmt +} + +func (s *accountDataStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(accountDataSchema) + if err != nil { + return + } + if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { + return + } + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + return + } + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + return + } + return +} + +func (s *accountDataStatements) insertAccountData( + ctx context.Context, + userID, roomID, dataType string, +) (pos int64, err error) { + err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) + return +} + +func (s *accountDataStatements) selectAccountDataInRange( + ctx context.Context, + userID string, + oldPos, newPos int64, + accountDataFilterPart *gomatrixserverlib.FilterPart, +) (data map[string][]string, err error) { + data = make(map[string][]string) + + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // it would prevent the SQL request from returning anything. + if oldPos == newPos { + oldPos-- + } + + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)), + accountDataFilterPart.Limit, + ) + if err != nil { + return + } + + for rows.Next() { + var dataType string + var roomID string + + if err = rows.Scan(&roomID, &dataType); err != nil { + return + } + + if len(data[roomID]) > 0 { + data[roomID] = append(data[roomID], dataType) + } else { + data[roomID] = []string{dataType} + } + } + + return +} + +func (s *accountDataStatements) selectMaxAccountDataID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go new file mode 100644 index 00000000..dbfa111b --- /dev/null +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -0,0 +1,285 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const currentRoomStateSchema = ` +-- Stores the current room state for every room. +CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( + -- The 'room_id' key for the state event. + room_id TEXT NOT NULL, + -- The state event ID + event_id TEXT NOT NULL, + -- The state event type e.g 'm.room.member' + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- true if the event content contains a url key + contains_url BOOL NOT NULL, + -- The state_key value for this state event e.g '' + state_key TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The 'content.membership' value if this event is an m.room.member event. For other + -- events, this will be NULL. + membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, + -- Clobber based on 3-uple of room_id, type and state_key + CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) +); +-- for event deletion +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url); +-- for querying membership states of users +CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; +` + +const upsertRoomStateSQL = "" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" + +const deleteRoomStateByEventIDSQL = "" + + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" + +const selectRoomIDsWithMembershipSQL = "" + + "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" + +const selectCurrentStateSQL = "" + + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + + " AND ( $6::bool IS NULL OR contains_url = $6 )" + + " LIMIT $7" + +const selectJoinedUsersSQL = "" + + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" + +const selectStateEventSQL = "" + + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" + +const selectEventsWithEventIDsSQL = "" + + // TODO: The session_id and transaction_id blanks are here because otherwise + // the rowsToStreamEvents expects there to be exactly four columns. We need to + // figure out if these really need to be in the DB, and if so, we need a + // better permanent fix for this. - neilalexander, 2 Jan 2020 + "SELECT added_at, event_json, 0 AS session_id, '' AS transaction_id" + + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" + +type currentRoomStateStatements struct { + upsertRoomStateStmt *sql.Stmt + deleteRoomStateByEventIDStmt *sql.Stmt + selectRoomIDsWithMembershipStmt *sql.Stmt + selectCurrentStateStmt *sql.Stmt + selectJoinedUsersStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt + selectStateEventStmt *sql.Stmt +} + +func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(currentRoomStateSchema) + if err != nil { + return + } + if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { + return + } + if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { + return + } + if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { + return + } + if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { + return + } + if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { + return + } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return + } + return +} + +// JoinedMemberLists returns a map of room ID to a list of joined user IDs. +func (s *currentRoomStateStatements) selectJoinedUsers( + ctx context.Context, +) (map[string][]string, error) { + rows, err := s.selectJoinedUsersStmt.QueryContext(ctx) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + result := make(map[string][]string) + for rows.Next() { + var roomID string + var userID string + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, nil +} + +// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. +func (s *currentRoomStateStatements) selectRoomIDsWithMembership( + ctx context.Context, + txn *sql.Tx, + userID string, + membership string, // nolint: unparam +) ([]string, error) { + stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) + rows, err := stmt.QueryContext(ctx, userID, membership) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var result []string + for rows.Next() { + var roomID string + if err := rows.Scan(&roomID); err != nil { + return nil, err + } + result = append(result, roomID) + } + return result, nil +} + +// CurrentState returns all the current state events for the given room. +func (s *currentRoomStateStatements) selectCurrentState( + ctx context.Context, txn *sql.Tx, roomID string, + stateFilterPart *gomatrixserverlib.FilterPart, +) ([]gomatrixserverlib.Event, error) { + stmt := common.TxStmt(txn, s.selectCurrentStateStmt) + rows, err := stmt.QueryContext(ctx, roomID, + pq.StringArray(stateFilterPart.Senders), + pq.StringArray(stateFilterPart.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), + stateFilterPart.ContainsURL, + stateFilterPart.Limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + return rowsToEvents(rows) +} + +func (s *currentRoomStateStatements) deleteRoomStateByEventID( + ctx context.Context, txn *sql.Tx, eventID string, +) error { + stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt) + _, err := stmt.ExecContext(ctx, eventID) + return err +} + +func (s *currentRoomStateStatements) upsertRoomState( + ctx context.Context, txn *sql.Tx, + event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { + // Parse content as JSON and search for an "url" key + containsURL := false + var content map[string]interface{} + if json.Unmarshal(event.Content(), &content) != nil { + // Set containsURL to true if url is present + _, containsURL = content["url"] + } + + // upsert state event + stmt := common.TxStmt(txn, s.upsertRoomStateStmt) + _, err := stmt.ExecContext( + ctx, + event.RoomID(), + event.EventID(), + event.Type(), + event.Sender(), + containsURL, + *event.StateKey(), + event.JSON(), + membership, + addedAt, + ) + return err +} + +func (s *currentRoomStateStatements) selectEventsWithEventIDs( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) + rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + return rowsToStreamEvents(rows) +} + +func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { + result := []gomatrixserverlib.Event{} + for rows.Next() { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + result = append(result, ev) + } + return result, nil +} + +func (s *currentRoomStateStatements) selectStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.Event, error) { + stmt := s.selectStateEventStmt + var res []byte + err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) + return &ev, err +} diff --git a/syncapi/storage/postgres/filtering.go b/syncapi/storage/postgres/filtering.go new file mode 100644 index 00000000..dcc42136 --- /dev/null +++ b/syncapi/storage/postgres/filtering.go @@ -0,0 +1,36 @@ +// Copyright 2017 Thibaut CHARLES +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "strings" +) + +// filterConvertWildcardToSQL converts wildcards as defined in +// https://matrix.org/docs/spec/client_server/r0.3.0.html#post-matrix-client-r0-user-userid-filter +// to SQL wildcards that can be used with LIKE() +func filterConvertTypeWildcardToSQL(values []string) []string { + if values == nil { + // Return nil instead of []string{} so IS NULL can work correctly when + // the return value is passed into SQL queries + return nil + } + + ret := make([]string, len(values)) + for i := range values { + ret[i] = strings.Replace(values[i], "*", "%", -1) + } + return ret +} diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go new file mode 100644 index 00000000..ced4bfc4 --- /dev/null +++ b/syncapi/storage/postgres/invites_table.go @@ -0,0 +1,148 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const inviteEventsSchema = ` +CREATE TABLE IF NOT EXISTS syncapi_invite_events ( + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + event_json TEXT NOT NULL +); + +-- For looking up the invites for a given user. +CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx + ON syncapi_invite_events (target_user_id, id); + +-- For deleting old invites +CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx + ON syncapi_invite_events (event_id); +` + +const insertInviteEventSQL = "" + + "INSERT INTO syncapi_invite_events (" + + " room_id, event_id, target_user_id, event_json" + + ") VALUES ($1, $2, $3, $4) RETURNING id" + +const deleteInviteEventSQL = "" + + "DELETE FROM syncapi_invite_events WHERE event_id = $1" + +const selectInviteEventsInRangeSQL = "" + + "SELECT room_id, event_json FROM syncapi_invite_events" + + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id DESC" + +const selectMaxInviteIDSQL = "" + + "SELECT MAX(id) FROM syncapi_invite_events" + +type inviteEventsStatements struct { + insertInviteEventStmt *sql.Stmt + selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt + selectMaxInviteIDStmt *sql.Stmt +} + +func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(inviteEventsSchema) + if err != nil { + return + } + if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { + return + } + if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { + return + } + if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { + return + } + if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil { + return + } + return +} + +func (s *inviteEventsStatements) insertInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (streamPos int64, err error) { + err = s.insertInviteEventStmt.QueryRowContext( + ctx, + inviteEvent.RoomID(), + inviteEvent.EventID(), + *inviteEvent.StateKey(), + inviteEvent.JSON(), + ).Scan(&streamPos) + return +} + +func (s *inviteEventsStatements) deleteInviteEvent( + ctx context.Context, inviteEventID string, +) error { + _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) + return err +} + +// selectInviteEventsInRange returns a map of room ID to invite event for the +// active invites for the target user ID in the supplied range. +func (s *inviteEventsStatements) selectInviteEventsInRange( + ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, +) (map[string]gomatrixserverlib.Event, error) { + stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) + rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + result := map[string]gomatrixserverlib.Event{} + for rows.Next() { + var ( + roomID string + eventJSON []byte + ) + if err = rows.Scan(&roomID, &eventJSON); err != nil { + return nil, err + } + + event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) + if err != nil { + return nil, err + } + + result[roomID] = event + } + return result, nil +} + +func (s *inviteEventsStatements) selectMaxInviteID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxInviteIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go new file mode 100644 index 00000000..3927f0c3 --- /dev/null +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -0,0 +1,328 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "sort" + + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const outputRoomEventsSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( + -- An incrementing ID which denotes the position in the log that this event resides at. + -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. + -- This isn't a problem for us since we just want to order by this field. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- The event ID for the event + event_id TEXT NOT NULL, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The event type e.g 'm.room.member'. + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- true if the event content contains a url key. + contains_url BOOL NOT NULL, + -- A list of event IDs which represent a delta of added/removed room state. This can be NULL + -- if there is no delta. + add_state_ids TEXT[], + remove_state_ids TEXT[], + session_id BIGINT, -- The client session that sent the event, if any + transaction_id TEXT -- The transaction id used to send the event, if any +); +-- for event selection +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); +` + +const insertEventSQL = "" + + "INSERT INTO syncapi_output_room_events (" + + "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id" + +const selectEventsSQL = "" + + "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + +const selectRecentEventsSQL = "" + + "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id DESC LIMIT $4" + +const selectMaxEventIDSQL = "" + + "SELECT MAX(id) FROM syncapi_output_room_events" + +// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). +const selectStateInRangeSQL = "" + + "SELECT id, event_json, add_state_ids, remove_state_ids" + + " FROM syncapi_output_room_events" + + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + + " AND ( $4::text[] IS NULL OR NOT(sender = ANY($4)) )" + + " AND ( $5::text[] IS NULL OR type LIKE ANY($5) )" + + " AND ( $6::text[] IS NULL OR NOT(type LIKE ANY($6)) )" + + " AND ( $7::bool IS NULL OR contains_url = $7 )" + + " ORDER BY id ASC" + + " LIMIT $8" + +type outputRoomEventsStatements struct { + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt +} + +func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(outputRoomEventsSchema) + if err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { + return + } + if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { + return + } + if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { + return + } + if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { + return + } + return +} + +// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. +// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the +// two positions, only the most recent state is returned. +func (s *outputRoomEventsStatements) selectStateInRange( + ctx context.Context, txn *sql.Tx, oldPos, newPos int64, + stateFilterPart *gomatrixserverlib.FilterPart, +) (map[string]map[string]bool, map[string]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectStateInRangeStmt) + + rows, err := stmt.QueryContext( + ctx, oldPos, newPos, + pq.StringArray(stateFilterPart.Senders), + pq.StringArray(stateFilterPart.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), + stateFilterPart.ContainsURL, + stateFilterPart.Limit, + ) + if err != nil { + return nil, nil, err + } + // Fetch all the state change events for all rooms between the two positions then loop each event and: + // - Keep a cache of the event by ID (99% of state change events are for the event itself) + // - For each room ID, build up an array of event IDs which represents cumulative adds/removes + // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID + // if they aren't in the event ID cache. We don't handle state deletion yet. + eventIDToEvent := make(map[string]streamEvent) + + // RoomID => A set (map[string]bool) of state event IDs which are between the two positions + stateNeeded := make(map[string]map[string]bool) + + for rows.Next() { + var ( + streamPos int64 + eventBytes []byte + addIDs pq.StringArray + delIDs pq.StringArray + ) + if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { + return nil, nil, err + } + // Sanity check for deleted state and whine if we see it. We don't need to do anything + // since it'll just mark the event as not being needed. + if len(addIDs) < len(delIDs) { + log.WithFields(log.Fields{ + "since": oldPos, + "current": newPos, + "adds": addIDs, + "dels": delIDs, + }).Warn("StateBetween: ignoring deleted state") + } + + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, nil, err + } + needSet := stateNeeded[ev.RoomID()] + if needSet == nil { // make set if required + needSet = make(map[string]bool) + } + for _, id := range delIDs { + needSet[id] = false + } + for _, id := range addIDs { + needSet[id] = true + } + stateNeeded[ev.RoomID()] = needSet + + eventIDToEvent[ev.EventID()] = streamEvent{ + Event: ev, + streamPosition: streamPos, + } + } + + return stateNeeded, eventIDToEvent, nil +} + +// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, +// then this function should only ever be used at startup, as it will race with inserting events if it is +// done afterwards. If there are no inserted events, 0 is returned. +func (s *outputRoomEventsStatements) selectMaxEventID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxEventIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} + +// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position +// of the inserted event. +func (s *outputRoomEventsStatements) insertEvent( + ctx context.Context, txn *sql.Tx, + event *gomatrixserverlib.Event, addState, removeState []string, + transactionID *api.TransactionID, +) (streamPos int64, err error) { + var txnID *string + var sessionID *int64 + if transactionID != nil { + sessionID = &transactionID.SessionID + txnID = &transactionID.TransactionID + } + + // Parse content as JSON and search for an "url" key + containsURL := false + var content map[string]interface{} + if json.Unmarshal(event.Content(), &content) != nil { + // Set containsURL to true if url is present + _, containsURL = content["url"] + } + + stmt := common.TxStmt(txn, s.insertEventStmt) + err = stmt.QueryRowContext( + ctx, + event.RoomID(), + event.EventID(), + event.JSON(), + event.Type(), + event.Sender(), + containsURL, + pq.StringArray(addState), + pq.StringArray(removeState), + sessionID, + txnID, + ).Scan(&streamPos) + return +} + +// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. +func (s *outputRoomEventsStatements) selectRecentEvents( + ctx context.Context, txn *sql.Tx, + roomID string, fromPos, toPos int64, limit int, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectRecentEventsStmt) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + events, err := rowsToStreamEvents(rows) + if err != nil { + return nil, err + } + // The events need to be returned from oldest to latest, which isn't + // necessarily the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(events, func(i int, j int) bool { + return events[i].streamPosition < events[j].streamPosition + }) + return events, nil +} + +// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing +// from the database. +func (s *outputRoomEventsStatements) selectEvents( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectEventsStmt) + rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + return rowsToStreamEvents(rows) +} + +func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { + var result []streamEvent + for rows.Next() { + var ( + streamPos int64 + eventBytes []byte + sessionID *int64 + txnID *string + transactionID *api.TransactionID + ) + if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &txnID); err != nil { + return nil, err + } + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + + if sessionID != nil && txnID != nil { + transactionID = &api.TransactionID{ + SessionID: *sessionID, + TransactionID: *txnID, + } + } + + result = append(result, streamEvent{ + Event: ev, + streamPosition: streamPos, + transactionID: transactionID, + }) + } + return result, nil +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go new file mode 100644 index 00000000..fc7b4e40 --- /dev/null +++ b/syncapi/storage/postgres/syncserver.go @@ -0,0 +1,948 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/roomserver/api" + + // Import the postgres database driver. + _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/gomatrixserverlib" +) + +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.Event + membership string + // The PDU stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + membershipPos int64 +} + +// Same as gomatrixserverlib.Event but also has the PDU stream position for this event. +type streamEvent struct { + gomatrixserverlib.Event + streamPosition int64 + transactionID *api.TransactionID +} + +// SyncServerDatabase represents a sync server datasource which manages +// both the database for PDUs and caches for EDUs. +type SyncServerDatasource struct { + db *sql.DB + common.PartitionOffsetStatements + accountData accountDataStatements + events outputRoomEventsStatements + roomstate currentRoomStateStatements + invites inviteEventsStatements + typingCache *cache.TypingCache +} + +// NewSyncServerDatabase creates a new sync server database +func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, error) { + var d SyncServerDatasource + var err error + if d.db, err = sql.Open("postgres", dbDataSourceName); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { + return nil, err + } + if err = d.accountData.prepare(d.db); err != nil { + return nil, err + } + if err = d.events.prepare(d.db); err != nil { + return nil, err + } + if err := d.roomstate.prepare(d.db); err != nil { + return nil, err + } + if err := d.invites.prepare(d.db); err != nil { + return nil, err + } + d.typingCache = cache.NewTypingCache() + return &d, nil +} + +// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. +func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { + return d.roomstate.selectJoinedUsers(ctx) +} + +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database. +// Does not include any transaction IDs in the returned events. +func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { + streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) + if err != nil { + return nil, err + } + + // We don't include a device here as we only include transaction IDs in + // incremental syncs. + return streamEventsToEvents(nil, streamEvents), nil +} + +// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races +// when generating the sync stream position for this event. Returns the sync stream position for the inserted event. +// Returns an error if there was a problem inserting this event. +func (d *SyncServerDatasource) WriteEvent( + ctx context.Context, + ev *gomatrixserverlib.Event, + addStateEvents []gomatrixserverlib.Event, + addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, +) (pduPosition int64, returnErr error) { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { + var err error + pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) + if err != nil { + return err + } + pduPosition = pos + + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { + // Nothing to do, the event may have just been a message event. + return nil + } + + return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) + }) + return +} + +func (d *SyncServerDatasource) updateRoomState( + ctx context.Context, txn *sql.Tx, + removedEventIDs []string, + addedEvents []gomatrixserverlib.Event, + pduPosition int64, +) error { + // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. + for _, eventID := range removedEventIDs { + if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil { + return err + } + } + + for _, event := range addedEvents { + if event.StateKey() == nil { + // ignore non state events + continue + } + var membership *string + if event.Type() == "m.room.member" { + value, err := event.Membership() + if err != nil { + return err + } + membership = &value + } + if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { + return err + } + } + + return nil +} + +// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key +// If no event could be found, returns nil +// If there was an issue during the retrieval, returns an error +func (d *SyncServerDatasource) GetStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.Event, error) { + return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) +} + +// GetStateEventsForRoom fetches the state events for a given room. +// Returns an empty slice if no state events could be found for this room. +// Returns an error if there was an issue with the retrieval. +func (d *SyncServerDatasource) GetStateEventsForRoom( + ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.FilterPart, +) (stateEvents []gomatrixserverlib.Event, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart) + return err + }) + return +} + +// SyncPosition returns the latest positions for syncing. +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.SyncPosition, error) { + return d.syncPositionTx(ctx, nil) +} + +func (d *SyncServerDatasource) syncPositionTx( + ctx context.Context, txn *sql.Tx, +) (sp types.SyncPosition, err error) { + + maxEventID, err := d.events.selectMaxEventID(ctx, txn) + if err != nil { + return sp, err + } + maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + if err != nil { + return sp, err + } + if maxAccountDataID > maxEventID { + maxEventID = maxAccountDataID + } + maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) + if err != nil { + return sp, err + } + if maxInviteID > maxEventID { + maxEventID = maxInviteID + } + sp.PDUPosition = maxEventID + + sp.TypingPosition = d.typingCache.GetLatestSyncPosition() + + return +} + +// addPDUDeltaToResponse adds all PDU deltas to a sync response. +// IDs of all rooms the user joined are returned so EDU deltas can be added for them. +func (d *SyncServerDatasource) addPDUDeltaToResponse( + ctx context.Context, + device authtypes.Device, + fromPos, toPos int64, + numRecentEventsPerRoom int, + wantFullState bool, + res *types.Response, +) ([]string, error) { + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + stateFilterPart := gomatrixserverlib.DefaultFilterPart() // TODO: use filter provided in request + + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + var deltas []stateDelta + var joinedRoomIDs []string + if !wantFullState { + deltas, joinedRoomIDs, err = d.getStateDeltas( + ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, + ) + } else { + deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( + ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, + ) + } + if err != nil { + return nil, err + } + + for _, delta := range deltas { + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + if err != nil { + return nil, err + } + } + + // TODO: This should be done in getStateDeltas + if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + return nil, err + } + + succeeded = true + return joinedRoomIDs, nil +} + +// addTypingDeltaToResponse adds all typing notifications to a sync response +// since the specified position. +func (d *SyncServerDatasource) addTypingDeltaToResponse( + since int64, + joinedRoomIDs []string, + res *types.Response, +) error { + var jr types.JoinResponse + var ok bool + var err error + for _, roomID := range joinedRoomIDs { + if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( + roomID, since, + ); updated { + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MTyping, + } + ev.Content, err = json.Marshal(map[string]interface{}{ + "user_ids": typingUsers, + }) + if err != nil { + return err + } + + if jr, ok = res.Rooms.Join[roomID]; !ok { + jr = *types.NewJoinResponse() + } + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.Rooms.Join[roomID] = jr + } + } + return nil +} + +// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if +// the positions of that type are not equal in fromPos and toPos. +func (d *SyncServerDatasource) addEDUDeltaToResponse( + fromPos, toPos types.SyncPosition, + joinedRoomIDs []string, + res *types.Response, +) (err error) { + + if fromPos.TypingPosition != toPos.TypingPosition { + err = d.addTypingDeltaToResponse( + fromPos.TypingPosition, joinedRoomIDs, res, + ) + } + + return +} + +// IncrementalSync returns all the data needed in order to create an incremental +// sync response for the given user. Events returned will include any client +// transaction IDs associated with the given device. These transaction IDs come +// from when the device sent the event via an API that included a transaction +// ID. +func (d *SyncServerDatasource) IncrementalSync( + ctx context.Context, + device authtypes.Device, + fromPos, toPos types.SyncPosition, + numRecentEventsPerRoom int, + wantFullState bool, +) (*types.Response, error) { + nextBatchPos := fromPos.WithUpdates(toPos) + res := types.NewResponse(nextBatchPos) + + var joinedRoomIDs []string + var err error + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { + joinedRoomIDs, err = d.addPDUDeltaToResponse( + ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, + ) + } else { + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( + ctx, nil, device.UserID, gomatrixserverlib.Join, + ) + } + if err != nil { + return nil, err + } + + err = d.addEDUDeltaToResponse( + fromPos, toPos, joinedRoomIDs, res, + ) + if err != nil { + return nil, err + } + + return res, nil +} + +// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed +// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. +func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( + ctx context.Context, + userID string, + numRecentEventsPerRoom int, +) ( + res *types.Response, + toPos types.SyncPosition, + joinedRoomIDs []string, + err error, +) { + // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have + // a consistent view of the database throughout. This includes extracting the sync position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + // Get the current sync position which we will base the sync response on. + toPos, err = d.syncPositionTx(ctx, txn) + if err != nil { + return + } + + res = types.NewResponse(toPos) + + // Extract room state and recent events for all rooms the user is joined to. + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return + } + + stateFilterPart := gomatrixserverlib.DefaultFilterPart() // TODO: use filter provided in request + + // Build up a /sync response. Add joined rooms. + for _, roomID := range joinedRoomIDs { + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) + if err != nil { + return + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []streamEvent + recentStreamEvents, err = d.events.selectRecentEvents( + ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom, + ) + if err != nil { + return + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs + recentEvents := streamEventsToEvents(nil, recentStreamEvents) + + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr := types.NewJoinResponse() + if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) + } else { + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = "1" + } + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr + } + + if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil { + return + } + + succeeded = true + return res, toPos, joinedRoomIDs, err +} + +// CompleteSync returns a complete /sync API response for the given user. +func (d *SyncServerDatasource) CompleteSync( + ctx context.Context, userID string, numRecentEventsPerRoom int, +) (*types.Response, error) { + res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( + ctx, userID, numRecentEventsPerRoom, + ) + if err != nil { + return nil, err + } + + // Use a zero value SyncPosition for fromPos so all EDU states are added. + err = d.addEDUDeltaToResponse( + types.SyncPosition{}, toPos, joinedRoomIDs, res, + ) + if err != nil { + return nil, err + } + + return res, nil +} + +var txReadOnlySnapshot = sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, +} + +// GetAccountDataInRange returns all account data for a given user inserted or +// updated between two given positions +// Returns a map following the format data[roomID] = []dataTypes +// If no data is retrieved, returns an empty map +// If there was an issue with the retrieval, returns an error +func (d *SyncServerDatasource) GetAccountDataInRange( + ctx context.Context, userID string, oldPos, newPos int64, + accountDataFilterPart *gomatrixserverlib.FilterPart, +) (map[string][]string, error) { + return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) +} + +// UpsertAccountData keeps track of new or updated account data, by saving the type +// of the new/updated data, and the user ID and room ID the data is related to (empty) +// room ID means the data isn't specific to any room) +// If no data with the given type, user ID and room ID exists in the database, +// creates a new row, else update the existing one +// Returns an error if there was an issue with the upsert +func (d *SyncServerDatasource) UpsertAccountData( + ctx context.Context, userID, roomID, dataType string, +) (int64, error) { + return d.accountData.insertAccountData(ctx, userID, roomID, dataType) +} + +// AddInviteEvent stores a new invite event for a user. +// If the invite was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatasource) AddInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (int64, error) { + return d.invites.insertInviteEvent(ctx, inviteEvent) +} + +// RetireInviteEvent removes an old invite event from the database. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatasource) RetireInviteEvent( + ctx context.Context, inviteEventID string, +) error { + // TODO: Record that invite has been retired in a stream so that we can + // notify the user in an incremental sync. + err := d.invites.deleteInviteEvent(ctx, inviteEventID) + return err +} + +func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { + d.typingCache.SetTimeoutCallback(fn) +} + +// AddTypingUser adds a typing user to the typing cache. +// Returns the newly calculated sync position for typing notifications. +func (d *SyncServerDatasource) AddTypingUser( + userID, roomID string, expireTime *time.Time, +) int64 { + return d.typingCache.AddTypingUser(userID, roomID, expireTime) +} + +// RemoveTypingUser removes a typing user from the typing cache. +// Returns the newly calculated sync position for typing notifications. +func (d *SyncServerDatasource) RemoveTypingUser( + userID, roomID string, +) int64 { + return d.typingCache.RemoveUser(userID, roomID) +} + +func (d *SyncServerDatasource) addInvitesToResponse( + ctx context.Context, txn *sql.Tx, + userID string, + fromPos, toPos int64, + res *types.Response, +) error { + invites, err := d.invites.selectInviteEventsInRange( + ctx, txn, userID, int64(fromPos), int64(toPos), + ) + if err != nil { + return err + } + for roomID, inviteEvent := range invites { + ir := types.NewInviteResponse() + ir.InviteState.Events = gomatrixserverlib.ToClientEvents( + []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, + ) + // TODO: add the invite state from the invite event. + res.Rooms.Invite[roomID] = *ir + } + return nil +} + +// addRoomDeltaToResponse adds a room state delta to a sync response +func (d *SyncServerDatasource) addRoomDeltaToResponse( + ctx context.Context, + device *authtypes.Device, + txn *sql.Tx, + fromPos, toPos int64, + delta stateDelta, + numRecentEventsPerRoom int, + res *types.Response, +) error { + endPos := toPos + if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + recentStreamEvents, err := d.events.selectRecentEvents( + ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + ) + if err != nil { + return err + } + recentEvents := streamEventsToEvents(device, recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + + var prevPDUPos int64 + + if len(recentEvents) == 0 { + if len(delta.stateEvents) == 0 { + // Don't bother appending empty room entries + return nil + } + + // If full_state=true and since is already up to date, then we'll have + // state events but no recent events. + prevPDUPos = toPos - 1 + } else { + prevPDUPos = recentStreamEvents[0].streamPosition - 1 + } + + if prevPDUPos <= 0 { + prevPDUPos = 1 + } + + switch delta.membership { + case gomatrixserverlib.Join: + jr := types.NewJoinResponse() + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case gomatrixserverlib.Leave: + fallthrough // transitions to leave are the same as ban + case gomatrixserverlib.Ban: + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + // Use the short form of batch token for prev_batch + lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } + + return nil +} + +// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. +// Returns a map of room ID to list of events. +func (d *SyncServerDatasource) fetchStateEvents( + ctx context.Context, txn *sql.Tx, + roomIDToEventIDSet map[string]map[string]bool, + eventIDToEvent map[string]streamEvent, +) (map[string][]streamEvent, error) { + stateBetween := make(map[string][]streamEvent) + missingEvents := make(map[string][]string) + for roomID, ids := range roomIDToEventIDSet { + events := stateBetween[roomID] + for id, need := range ids { + if !need { + continue // deleted state + } + e, ok := eventIDToEvent[id] + if ok { + events = append(events, e) + } else { + m := missingEvents[roomID] + m = append(m, id) + missingEvents[roomID] = m + } + } + stateBetween[roomID] = events + } + + if len(missingEvents) > 0 { + // This happens when add_state_ids has an event ID which is not in the provided range. + // We need to explicitly fetch them. + allMissingEventIDs := []string{} + for _, missingEvIDs := range missingEvents { + allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) + } + evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) + if err != nil { + return nil, err + } + // we know we got them all otherwise an error would've been returned, so just loop the events + for _, ev := range evs { + roomID := ev.RoomID() + stateBetween[roomID] = append(stateBetween[roomID], ev) + } + } + return stateBetween, nil +} + +func (d *SyncServerDatasource) fetchMissingStateEvents( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.events.selectEvents(ctx, txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + events = append(events, stateEvents...) + return events, nil +} + +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. +func (d *SyncServerDatasource) getStateDeltas( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, + stateFilterPart *gomatrixserverlib.FilterPart, +) ([]stateDelta, []string, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + var deltas []stateDelta + + // get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership == gomatrixserverlib.Join { + // send full room state down instead of a delta + var s []streamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart) + if err != nil { + return nil, nil, err + } + state[roomID] = s + continue // we'll add this room in when we do joined rooms + } + + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + break + } + } + } + + // Add in currently joined rooms + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return nil, nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Join, + stateEvents: streamEventsToEvents(device, state[joinedRoomID]), + roomID: joinedRoomID, + }) + } + + return deltas, joinedRoomIDs, nil +} + +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *SyncServerDatasource) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, + stateFilterPart *gomatrixserverlib.FilterPart, +) ([]stateDelta, []string, error) { + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return nil, nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Join, + stateEvents: streamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // Get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, joinedRoomIDs, nil +} + +func (d *SyncServerDatasource) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, + stateFilterPart *gomatrixserverlib.FilterPart, +) ([]streamEvent, error) { + allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: 0} + } + return s, nil +} + +// streamEventsToEvents converts streamEvent to Event. If device is non-nil and +// matches the streamevent.transactionID device then the transaction ID gets +// added to the unsigned section of the output event. +func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { + out := make([]gomatrixserverlib.Event, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[i].Event + if device != nil && in[i].transactionID != nil { + if device.UserID == in[i].Sender() && device.SessionID == in[i].transactionID.SessionID { + err := out[i].SetUnsignedField( + "transaction_id", in[i].transactionID.TransactionID, + ) + if err != nil { + logrus.WithFields(logrus.Fields{ + "event_id": out[i].EventID(), + }).WithError(err).Warnf("Failed to add transaction ID to event") + } + } + } + } + return out +} + +// There may be some overlap where events in stateEvents are already in recentEvents, so filter +// them out so we don't include them twice in the /sync response. They should be in recentEvents +// only, so clients get to the correct state once they have rolled forward. +func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} + +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + membership, err := ev.Membership() + if err != nil { + return "" + } + return membership + } + return "" +} |