aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/account_data_table.go149
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go285
-rw-r--r--syncapi/storage/postgres/filtering.go36
-rw-r--r--syncapi/storage/postgres/invites_table.go148
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go328
-rw-r--r--syncapi/storage/postgres/syncserver.go948
6 files changed, 1894 insertions, 0 deletions
diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go
new file mode 100644
index 00000000..33cfffad
--- /dev/null
+++ b/syncapi/storage/postgres/account_data_table.go
@@ -0,0 +1,149 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const accountDataSchema = `
+-- This sequence is shared between all the tables generated from kafka logs.
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+
+-- Stores the types of account data that a user set has globally and in each room
+-- and the stream ID when that type was last updated.
+CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
+ -- An incrementing ID which denotes the position in the log that this event resides at.
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ -- ID of the user the data belongs to
+ user_id TEXT NOT NULL,
+ -- ID of the room the data is related to (empty string if not related to a specific room)
+ room_id TEXT NOT NULL,
+ -- Type of the data
+ type TEXT NOT NULL,
+
+ -- We don't want two entries of the same type for the same user
+ CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type);
+`
+
+const insertAccountDataSQL = "" +
+ "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
+ " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
+ " DO UPDATE SET id = EXCLUDED.id" +
+ " RETURNING id"
+
+const selectAccountDataInRangeSQL = "" +
+ "SELECT room_id, type FROM syncapi_account_data_type" +
+ " WHERE user_id = $1 AND id > $2 AND id <= $3" +
+ " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
+ " ORDER BY id ASC LIMIT $6"
+
+const selectMaxAccountDataIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_account_data_type"
+
+type accountDataStatements struct {
+ insertAccountDataStmt *sql.Stmt
+ selectAccountDataInRangeStmt *sql.Stmt
+ selectMaxAccountDataIDStmt *sql.Stmt
+}
+
+func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(accountDataSchema)
+ if err != nil {
+ return
+ }
+ if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
+ return
+ }
+ if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
+ return
+ }
+ if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *accountDataStatements) insertAccountData(
+ ctx context.Context,
+ userID, roomID, dataType string,
+) (pos int64, err error) {
+ err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
+ return
+}
+
+func (s *accountDataStatements) selectAccountDataInRange(
+ ctx context.Context,
+ userID string,
+ oldPos, newPos int64,
+ accountDataFilterPart *gomatrixserverlib.FilterPart,
+) (data map[string][]string, err error) {
+ data = make(map[string][]string)
+
+ // If both positions are the same, it means that the data was saved after the
+ // latest room event. In that case, we need to decrement the old position as
+ // it would prevent the SQL request from returning anything.
+ if oldPos == newPos {
+ oldPos--
+ }
+
+ rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
+ pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)),
+ accountDataFilterPart.Limit,
+ )
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var dataType string
+ var roomID string
+
+ if err = rows.Scan(&roomID, &dataType); err != nil {
+ return
+ }
+
+ if len(data[roomID]) > 0 {
+ data[roomID] = append(data[roomID], dataType)
+ } else {
+ data[roomID] = []string{dataType}
+ }
+ }
+
+ return
+}
+
+func (s *accountDataStatements) selectMaxAccountDataID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
new file mode 100644
index 00000000..dbfa111b
--- /dev/null
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -0,0 +1,285 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const currentRoomStateSchema = `
+-- Stores the current room state for every room.
+CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
+ -- The 'room_id' key for the state event.
+ room_id TEXT NOT NULL,
+ -- The state event ID
+ event_id TEXT NOT NULL,
+ -- The state event type e.g 'm.room.member'
+ type TEXT NOT NULL,
+ -- The 'sender' property of the event.
+ sender TEXT NOT NULL,
+ -- true if the event content contains a url key
+ contains_url BOOL NOT NULL,
+ -- The state_key value for this state event e.g ''
+ state_key TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ event_json TEXT NOT NULL,
+ -- The 'content.membership' value if this event is an m.room.member event. For other
+ -- events, this will be NULL.
+ membership TEXT,
+ -- The serial ID of the output_room_events table when this event became
+ -- part of the current state of the room.
+ added_at BIGINT,
+ -- Clobber based on 3-uple of room_id, type and state_key
+ CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
+);
+-- for event deletion
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url);
+-- for querying membership states of users
+CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
+`
+
+const upsertRoomStateSQL = "" +
+ "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
+ " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
+ " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
+
+const deleteRoomStateByEventIDSQL = "" +
+ "DELETE FROM syncapi_current_room_state WHERE event_id = $1"
+
+const selectRoomIDsWithMembershipSQL = "" +
+ "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
+
+const selectCurrentStateSQL = "" +
+ "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" +
+ " AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
+ " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
+ " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
+ " AND ( $6::bool IS NULL OR contains_url = $6 )" +
+ " LIMIT $7"
+
+const selectJoinedUsersSQL = "" +
+ "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
+
+const selectStateEventSQL = "" +
+ "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
+
+const selectEventsWithEventIDsSQL = "" +
+ // TODO: The session_id and transaction_id blanks are here because otherwise
+ // the rowsToStreamEvents expects there to be exactly four columns. We need to
+ // figure out if these really need to be in the DB, and if so, we need a
+ // better permanent fix for this. - neilalexander, 2 Jan 2020
+ "SELECT added_at, event_json, 0 AS session_id, '' AS transaction_id" +
+ " FROM syncapi_current_room_state WHERE event_id = ANY($1)"
+
+type currentRoomStateStatements struct {
+ upsertRoomStateStmt *sql.Stmt
+ deleteRoomStateByEventIDStmt *sql.Stmt
+ selectRoomIDsWithMembershipStmt *sql.Stmt
+ selectCurrentStateStmt *sql.Stmt
+ selectJoinedUsersStmt *sql.Stmt
+ selectEventsWithEventIDsStmt *sql.Stmt
+ selectStateEventStmt *sql.Stmt
+}
+
+func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(currentRoomStateSchema)
+ if err != nil {
+ return
+ }
+ if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
+ return
+ }
+ if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
+ return
+ }
+ if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
+ return
+ }
+ if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
+ return
+ }
+ if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
+ return
+ }
+ if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
+ return
+ }
+ if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
+ return
+ }
+ return
+}
+
+// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
+func (s *currentRoomStateStatements) selectJoinedUsers(
+ ctx context.Context,
+) (map[string][]string, error) {
+ rows, err := s.selectJoinedUsersStmt.QueryContext(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ result := make(map[string][]string)
+ for rows.Next() {
+ var roomID string
+ var userID string
+ if err := rows.Scan(&roomID, &userID); err != nil {
+ return nil, err
+ }
+ users := result[roomID]
+ users = append(users, userID)
+ result[roomID] = users
+ }
+ return result, nil
+}
+
+// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ membership string, // nolint: unparam
+) ([]string, error) {
+ stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
+ rows, err := stmt.QueryContext(ctx, userID, membership)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ var result []string
+ for rows.Next() {
+ var roomID string
+ if err := rows.Scan(&roomID); err != nil {
+ return nil, err
+ }
+ result = append(result, roomID)
+ }
+ return result, nil
+}
+
+// CurrentState returns all the current state events for the given room.
+func (s *currentRoomStateStatements) selectCurrentState(
+ ctx context.Context, txn *sql.Tx, roomID string,
+ stateFilterPart *gomatrixserverlib.FilterPart,
+) ([]gomatrixserverlib.Event, error) {
+ stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
+ rows, err := stmt.QueryContext(ctx, roomID,
+ pq.StringArray(stateFilterPart.Senders),
+ pq.StringArray(stateFilterPart.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
+ stateFilterPart.ContainsURL,
+ stateFilterPart.Limit,
+ )
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ return rowsToEvents(rows)
+}
+
+func (s *currentRoomStateStatements) deleteRoomStateByEventID(
+ ctx context.Context, txn *sql.Tx, eventID string,
+) error {
+ stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
+ _, err := stmt.ExecContext(ctx, eventID)
+ return err
+}
+
+func (s *currentRoomStateStatements) upsertRoomState(
+ ctx context.Context, txn *sql.Tx,
+ event gomatrixserverlib.Event, membership *string, addedAt int64,
+) error {
+ // Parse content as JSON and search for an "url" key
+ containsURL := false
+ var content map[string]interface{}
+ if json.Unmarshal(event.Content(), &content) != nil {
+ // Set containsURL to true if url is present
+ _, containsURL = content["url"]
+ }
+
+ // upsert state event
+ stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.Type(),
+ event.Sender(),
+ containsURL,
+ *event.StateKey(),
+ event.JSON(),
+ membership,
+ addedAt,
+ )
+ return err
+}
+
+func (s *currentRoomStateStatements) selectEventsWithEventIDs(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt)
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ return rowsToStreamEvents(rows)
+}
+
+func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
+ result := []gomatrixserverlib.Event{}
+ for rows.Next() {
+ var eventBytes []byte
+ if err := rows.Scan(&eventBytes); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, ev)
+ }
+ return result, nil
+}
+
+func (s *currentRoomStateStatements) selectStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.Event, error) {
+ stmt := s.selectStateEventStmt
+ var res []byte
+ err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
+ return &ev, err
+}
diff --git a/syncapi/storage/postgres/filtering.go b/syncapi/storage/postgres/filtering.go
new file mode 100644
index 00000000..dcc42136
--- /dev/null
+++ b/syncapi/storage/postgres/filtering.go
@@ -0,0 +1,36 @@
+// Copyright 2017 Thibaut CHARLES
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "strings"
+)
+
+// filterConvertWildcardToSQL converts wildcards as defined in
+// https://matrix.org/docs/spec/client_server/r0.3.0.html#post-matrix-client-r0-user-userid-filter
+// to SQL wildcards that can be used with LIKE()
+func filterConvertTypeWildcardToSQL(values []string) []string {
+ if values == nil {
+ // Return nil instead of []string{} so IS NULL can work correctly when
+ // the return value is passed into SQL queries
+ return nil
+ }
+
+ ret := make([]string, len(values))
+ for i := range values {
+ ret[i] = strings.Replace(values[i], "*", "%", -1)
+ }
+ return ret
+}
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
new file mode 100644
index 00000000..ced4bfc4
--- /dev/null
+++ b/syncapi/storage/postgres/invites_table.go
@@ -0,0 +1,148 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inviteEventsSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_invite_events (
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ target_user_id TEXT NOT NULL,
+ event_json TEXT NOT NULL
+);
+
+-- For looking up the invites for a given user.
+CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
+ ON syncapi_invite_events (target_user_id, id);
+
+-- For deleting old invites
+CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
+ ON syncapi_invite_events (event_id);
+`
+
+const insertInviteEventSQL = "" +
+ "INSERT INTO syncapi_invite_events (" +
+ " room_id, event_id, target_user_id, event_json" +
+ ") VALUES ($1, $2, $3, $4) RETURNING id"
+
+const deleteInviteEventSQL = "" +
+ "DELETE FROM syncapi_invite_events WHERE event_id = $1"
+
+const selectInviteEventsInRangeSQL = "" +
+ "SELECT room_id, event_json FROM syncapi_invite_events" +
+ " WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id DESC"
+
+const selectMaxInviteIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_invite_events"
+
+type inviteEventsStatements struct {
+ insertInviteEventStmt *sql.Stmt
+ selectInviteEventsInRangeStmt *sql.Stmt
+ deleteInviteEventStmt *sql.Stmt
+ selectMaxInviteIDStmt *sql.Stmt
+}
+
+func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(inviteEventsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil {
+ return
+ }
+ if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil {
+ return
+ }
+ if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil {
+ return
+ }
+ if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inviteEventsStatements) insertInviteEvent(
+ ctx context.Context, inviteEvent gomatrixserverlib.Event,
+) (streamPos int64, err error) {
+ err = s.insertInviteEventStmt.QueryRowContext(
+ ctx,
+ inviteEvent.RoomID(),
+ inviteEvent.EventID(),
+ *inviteEvent.StateKey(),
+ inviteEvent.JSON(),
+ ).Scan(&streamPos)
+ return
+}
+
+func (s *inviteEventsStatements) deleteInviteEvent(
+ ctx context.Context, inviteEventID string,
+) error {
+ _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID)
+ return err
+}
+
+// selectInviteEventsInRange returns a map of room ID to invite event for the
+// active invites for the target user ID in the supplied range.
+func (s *inviteEventsStatements) selectInviteEventsInRange(
+ ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64,
+) (map[string]gomatrixserverlib.Event, error) {
+ stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
+ rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ result := map[string]gomatrixserverlib.Event{}
+ for rows.Next() {
+ var (
+ roomID string
+ eventJSON []byte
+ )
+ if err = rows.Scan(&roomID, &eventJSON); err != nil {
+ return nil, err
+ }
+
+ event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
+ if err != nil {
+ return nil, err
+ }
+
+ result[roomID] = event
+ }
+ return result, nil
+}
+
+func (s *inviteEventsStatements) selectMaxInviteID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxInviteIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
new file mode 100644
index 00000000..3927f0c3
--- /dev/null
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -0,0 +1,328 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "sort"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const outputRoomEventsSchema = `
+-- This sequence is shared between all the tables generated from kafka logs.
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+
+-- Stores output room events received from the roomserver.
+CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
+ -- An incrementing ID which denotes the position in the log that this event resides at.
+ -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
+ -- This isn't a problem for us since we just want to order by this field.
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ -- The event ID for the event
+ event_id TEXT NOT NULL,
+ -- The 'room_id' key for the event.
+ room_id TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ event_json TEXT NOT NULL,
+ -- The event type e.g 'm.room.member'.
+ type TEXT NOT NULL,
+ -- The 'sender' property of the event.
+ sender TEXT NOT NULL,
+ -- true if the event content contains a url key.
+ contains_url BOOL NOT NULL,
+ -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
+ -- if there is no delta.
+ add_state_ids TEXT[],
+ remove_state_ids TEXT[],
+ session_id BIGINT, -- The client session that sent the event, if any
+ transaction_id TEXT -- The transaction id used to send the event, if any
+);
+-- for event selection
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
+`
+
+const insertEventSQL = "" +
+ "INSERT INTO syncapi_output_room_events (" +
+ "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id" +
+ ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id"
+
+const selectEventsSQL = "" +
+ "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
+
+const selectRecentEventsSQL = "" +
+ "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events" +
+ " WHERE room_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id DESC LIMIT $4"
+
+const selectMaxEventIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_output_room_events"
+
+// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
+const selectStateInRangeSQL = "" +
+ "SELECT id, event_json, add_state_ids, remove_state_ids" +
+ " FROM syncapi_output_room_events" +
+ " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
+ " AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
+ " AND ( $4::text[] IS NULL OR NOT(sender = ANY($4)) )" +
+ " AND ( $5::text[] IS NULL OR type LIKE ANY($5) )" +
+ " AND ( $6::text[] IS NULL OR NOT(type LIKE ANY($6)) )" +
+ " AND ( $7::bool IS NULL OR contains_url = $7 )" +
+ " ORDER BY id ASC" +
+ " LIMIT $8"
+
+type outputRoomEventsStatements struct {
+ insertEventStmt *sql.Stmt
+ selectEventsStmt *sql.Stmt
+ selectMaxEventIDStmt *sql.Stmt
+ selectRecentEventsStmt *sql.Stmt
+ selectStateInRangeStmt *sql.Stmt
+}
+
+func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(outputRoomEventsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
+ return
+ }
+ if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
+ return
+ }
+ if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
+ return
+ }
+ if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
+ return
+ }
+ return
+}
+
+// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
+// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
+// two positions, only the most recent state is returned.
+func (s *outputRoomEventsStatements) selectStateInRange(
+ ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
+ stateFilterPart *gomatrixserverlib.FilterPart,
+) (map[string]map[string]bool, map[string]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
+
+ rows, err := stmt.QueryContext(
+ ctx, oldPos, newPos,
+ pq.StringArray(stateFilterPart.Senders),
+ pq.StringArray(stateFilterPart.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
+ stateFilterPart.ContainsURL,
+ stateFilterPart.Limit,
+ )
+ if err != nil {
+ return nil, nil, err
+ }
+ // Fetch all the state change events for all rooms between the two positions then loop each event and:
+ // - Keep a cache of the event by ID (99% of state change events are for the event itself)
+ // - For each room ID, build up an array of event IDs which represents cumulative adds/removes
+ // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
+ // if they aren't in the event ID cache. We don't handle state deletion yet.
+ eventIDToEvent := make(map[string]streamEvent)
+
+ // RoomID => A set (map[string]bool) of state event IDs which are between the two positions
+ stateNeeded := make(map[string]map[string]bool)
+
+ for rows.Next() {
+ var (
+ streamPos int64
+ eventBytes []byte
+ addIDs pq.StringArray
+ delIDs pq.StringArray
+ )
+ if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
+ return nil, nil, err
+ }
+ // Sanity check for deleted state and whine if we see it. We don't need to do anything
+ // since it'll just mark the event as not being needed.
+ if len(addIDs) < len(delIDs) {
+ log.WithFields(log.Fields{
+ "since": oldPos,
+ "current": newPos,
+ "adds": addIDs,
+ "dels": delIDs,
+ }).Warn("StateBetween: ignoring deleted state")
+ }
+
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, nil, err
+ }
+ needSet := stateNeeded[ev.RoomID()]
+ if needSet == nil { // make set if required
+ needSet = make(map[string]bool)
+ }
+ for _, id := range delIDs {
+ needSet[id] = false
+ }
+ for _, id := range addIDs {
+ needSet[id] = true
+ }
+ stateNeeded[ev.RoomID()] = needSet
+
+ eventIDToEvent[ev.EventID()] = streamEvent{
+ Event: ev,
+ streamPosition: streamPos,
+ }
+ }
+
+ return stateNeeded, eventIDToEvent, nil
+}
+
+// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
+// then this function should only ever be used at startup, as it will race with inserting events if it is
+// done afterwards. If there are no inserted events, 0 is returned.
+func (s *outputRoomEventsStatements) selectMaxEventID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxEventIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
+
+// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
+// of the inserted event.
+func (s *outputRoomEventsStatements) insertEvent(
+ ctx context.Context, txn *sql.Tx,
+ event *gomatrixserverlib.Event, addState, removeState []string,
+ transactionID *api.TransactionID,
+) (streamPos int64, err error) {
+ var txnID *string
+ var sessionID *int64
+ if transactionID != nil {
+ sessionID = &transactionID.SessionID
+ txnID = &transactionID.TransactionID
+ }
+
+ // Parse content as JSON and search for an "url" key
+ containsURL := false
+ var content map[string]interface{}
+ if json.Unmarshal(event.Content(), &content) != nil {
+ // Set containsURL to true if url is present
+ _, containsURL = content["url"]
+ }
+
+ stmt := common.TxStmt(txn, s.insertEventStmt)
+ err = stmt.QueryRowContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.JSON(),
+ event.Type(),
+ event.Sender(),
+ containsURL,
+ pq.StringArray(addState),
+ pq.StringArray(removeState),
+ sessionID,
+ txnID,
+ ).Scan(&streamPos)
+ return
+}
+
+// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
+func (s *outputRoomEventsStatements) selectRecentEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomID string, fromPos, toPos int64, limit int,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
+ rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ events, err := rowsToStreamEvents(rows)
+ if err != nil {
+ return nil, err
+ }
+ // The events need to be returned from oldest to latest, which isn't
+ // necessarily the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(events, func(i int, j int) bool {
+ return events[i].streamPosition < events[j].streamPosition
+ })
+ return events, nil
+}
+
+// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
+// from the database.
+func (s *outputRoomEventsStatements) selectEvents(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectEventsStmt)
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ return rowsToStreamEvents(rows)
+}
+
+func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
+ var result []streamEvent
+ for rows.Next() {
+ var (
+ streamPos int64
+ eventBytes []byte
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ )
+ if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &txnID); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, err
+ }
+
+ if sessionID != nil && txnID != nil {
+ transactionID = &api.TransactionID{
+ SessionID: *sessionID,
+ TransactionID: *txnID,
+ }
+ }
+
+ result = append(result, streamEvent{
+ Event: ev,
+ streamPosition: streamPos,
+ transactionID: transactionID,
+ })
+ }
+ return result, nil
+}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
new file mode 100644
index 00000000..fc7b4e40
--- /dev/null
+++ b/syncapi/storage/postgres/syncserver.go
@@ -0,0 +1,948 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/roomserver/api"
+
+ // Import the postgres database driver.
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/typingserver/cache"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type stateDelta struct {
+ roomID string
+ stateEvents []gomatrixserverlib.Event
+ membership string
+ // The PDU stream position of the latest membership event for this user, if applicable.
+ // Can be 0 if there is no membership event in this delta.
+ membershipPos int64
+}
+
+// Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
+type streamEvent struct {
+ gomatrixserverlib.Event
+ streamPosition int64
+ transactionID *api.TransactionID
+}
+
+// SyncServerDatabase represents a sync server datasource which manages
+// both the database for PDUs and caches for EDUs.
+type SyncServerDatasource struct {
+ db *sql.DB
+ common.PartitionOffsetStatements
+ accountData accountDataStatements
+ events outputRoomEventsStatements
+ roomstate currentRoomStateStatements
+ invites inviteEventsStatements
+ typingCache *cache.TypingCache
+}
+
+// NewSyncServerDatabase creates a new sync server database
+func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, error) {
+ var d SyncServerDatasource
+ var err error
+ if d.db, err = sql.Open("postgres", dbDataSourceName); err != nil {
+ return nil, err
+ }
+ if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
+ return nil, err
+ }
+ if err = d.accountData.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err = d.events.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err := d.roomstate.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err := d.invites.prepare(d.db); err != nil {
+ return nil, err
+ }
+ d.typingCache = cache.NewTypingCache()
+ return &d, nil
+}
+
+// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
+func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
+ return d.roomstate.selectJoinedUsers(ctx)
+}
+
+// Events lookups a list of event by their event ID.
+// Returns a list of events matching the requested IDs found in the database.
+// If an event is not found in the database then it will be omitted from the list.
+// Returns an error if there was a problem talking with the database.
+// Does not include any transaction IDs in the returned events.
+func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+ streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ // We don't include a device here as we only include transaction IDs in
+ // incremental syncs.
+ return streamEventsToEvents(nil, streamEvents), nil
+}
+
+// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
+// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
+// Returns an error if there was a problem inserting this event.
+func (d *SyncServerDatasource) WriteEvent(
+ ctx context.Context,
+ ev *gomatrixserverlib.Event,
+ addStateEvents []gomatrixserverlib.Event,
+ addStateEventIDs, removeStateEventIDs []string,
+ transactionID *api.TransactionID,
+) (pduPosition int64, returnErr error) {
+ returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ var err error
+ pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
+ if err != nil {
+ return err
+ }
+ pduPosition = pos
+
+ if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
+ // Nothing to do, the event may have just been a message event.
+ return nil
+ }
+
+ return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
+ })
+ return
+}
+
+func (d *SyncServerDatasource) updateRoomState(
+ ctx context.Context, txn *sql.Tx,
+ removedEventIDs []string,
+ addedEvents []gomatrixserverlib.Event,
+ pduPosition int64,
+) error {
+ // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
+ for _, eventID := range removedEventIDs {
+ if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil {
+ return err
+ }
+ }
+
+ for _, event := range addedEvents {
+ if event.StateKey() == nil {
+ // ignore non state events
+ continue
+ }
+ var membership *string
+ if event.Type() == "m.room.member" {
+ value, err := event.Membership()
+ if err != nil {
+ return err
+ }
+ membership = &value
+ }
+ if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
+// If no event could be found, returns nil
+// If there was an issue during the retrieval, returns an error
+func (d *SyncServerDatasource) GetStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.Event, error) {
+ return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
+}
+
+// GetStateEventsForRoom fetches the state events for a given room.
+// Returns an empty slice if no state events could be found for this room.
+// Returns an error if there was an issue with the retrieval.
+func (d *SyncServerDatasource) GetStateEventsForRoom(
+ ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.FilterPart,
+) (stateEvents []gomatrixserverlib.Event, err error) {
+ err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
+ return err
+ })
+ return
+}
+
+// SyncPosition returns the latest positions for syncing.
+func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.SyncPosition, error) {
+ return d.syncPositionTx(ctx, nil)
+}
+
+func (d *SyncServerDatasource) syncPositionTx(
+ ctx context.Context, txn *sql.Tx,
+) (sp types.SyncPosition, err error) {
+
+ maxEventID, err := d.events.selectMaxEventID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ if maxAccountDataID > maxEventID {
+ maxEventID = maxAccountDataID
+ }
+ maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn)
+ if err != nil {
+ return sp, err
+ }
+ if maxInviteID > maxEventID {
+ maxEventID = maxInviteID
+ }
+ sp.PDUPosition = maxEventID
+
+ sp.TypingPosition = d.typingCache.GetLatestSyncPosition()
+
+ return
+}
+
+// addPDUDeltaToResponse adds all PDU deltas to a sync response.
+// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
+func (d *SyncServerDatasource) addPDUDeltaToResponse(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos int64,
+ numRecentEventsPerRoom int,
+ wantFullState bool,
+ res *types.Response,
+) ([]string, error) {
+ txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return nil, err
+ }
+ var succeeded bool
+ defer common.EndTransaction(txn, &succeeded)
+
+ stateFilterPart := gomatrixserverlib.DefaultFilterPart() // TODO: use filter provided in request
+
+ // Work out which rooms to return in the response. This is done by getting not only the currently
+ // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
+ // This works out what the 'state' key should be for each room as well as which membership block
+ // to put the room into.
+ var deltas []stateDelta
+ var joinedRoomIDs []string
+ if !wantFullState {
+ deltas, joinedRoomIDs, err = d.getStateDeltas(
+ ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
+ )
+ } else {
+ deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
+ ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ for _, delta := range deltas {
+ err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // TODO: This should be done in getStateDeltas
+ if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ return nil, err
+ }
+
+ succeeded = true
+ return joinedRoomIDs, nil
+}
+
+// addTypingDeltaToResponse adds all typing notifications to a sync response
+// since the specified position.
+func (d *SyncServerDatasource) addTypingDeltaToResponse(
+ since int64,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ var jr types.JoinResponse
+ var ok bool
+ var err error
+ for _, roomID := range joinedRoomIDs {
+ if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter(
+ roomID, since,
+ ); updated {
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MTyping,
+ }
+ ev.Content, err = json.Marshal(map[string]interface{}{
+ "user_ids": typingUsers,
+ })
+ if err != nil {
+ return err
+ }
+
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = *types.NewJoinResponse()
+ }
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+ }
+ return nil
+}
+
+// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
+// the positions of that type are not equal in fromPos and toPos.
+func (d *SyncServerDatasource) addEDUDeltaToResponse(
+ fromPos, toPos types.SyncPosition,
+ joinedRoomIDs []string,
+ res *types.Response,
+) (err error) {
+
+ if fromPos.TypingPosition != toPos.TypingPosition {
+ err = d.addTypingDeltaToResponse(
+ fromPos.TypingPosition, joinedRoomIDs, res,
+ )
+ }
+
+ return
+}
+
+// IncrementalSync returns all the data needed in order to create an incremental
+// sync response for the given user. Events returned will include any client
+// transaction IDs associated with the given device. These transaction IDs come
+// from when the device sent the event via an API that included a transaction
+// ID.
+func (d *SyncServerDatasource) IncrementalSync(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos types.SyncPosition,
+ numRecentEventsPerRoom int,
+ wantFullState bool,
+) (*types.Response, error) {
+ nextBatchPos := fromPos.WithUpdates(toPos)
+ res := types.NewResponse(nextBatchPos)
+
+ var joinedRoomIDs []string
+ var err error
+ if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
+ joinedRoomIDs, err = d.addPDUDeltaToResponse(
+ ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res,
+ )
+ } else {
+ joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
+ ctx, nil, device.UserID, gomatrixserverlib.Join,
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ err = d.addEDUDeltaToResponse(
+ fromPos, toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
+// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
+func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
+ ctx context.Context,
+ userID string,
+ numRecentEventsPerRoom int,
+) (
+ res *types.Response,
+ toPos types.SyncPosition,
+ joinedRoomIDs []string,
+ err error,
+) {
+ // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
+ // a consistent view of the database throughout. This includes extracting the sync position.
+ // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
+ // but it's better to not hide the fact that this is being done in a transaction.
+ txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return
+ }
+ var succeeded bool
+ defer common.EndTransaction(txn, &succeeded)
+
+ // Get the current sync position which we will base the sync response on.
+ toPos, err = d.syncPositionTx(ctx, txn)
+ if err != nil {
+ return
+ }
+
+ res = types.NewResponse(toPos)
+
+ // Extract room state and recent events for all rooms the user is joined to.
+ joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return
+ }
+
+ stateFilterPart := gomatrixserverlib.DefaultFilterPart() // TODO: use filter provided in request
+
+ // Build up a /sync response. Add joined rooms.
+ for _, roomID := range joinedRoomIDs {
+ var stateEvents []gomatrixserverlib.Event
+ stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
+ if err != nil {
+ return
+ }
+ // TODO: When filters are added, we may need to call this multiple times to get enough events.
+ // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
+ var recentStreamEvents []streamEvent
+ recentStreamEvents, err = d.events.selectRecentEvents(
+ ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return
+ }
+
+ // We don't include a device here as we don't need to send down
+ // transaction IDs for complete syncs
+ recentEvents := streamEventsToEvents(nil, recentStreamEvents)
+
+ stateEvents = removeDuplicates(stateEvents, recentEvents)
+ jr := types.NewJoinResponse()
+ if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
+ } else {
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = "1"
+ }
+ jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = true
+ jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[roomID] = *jr
+ }
+
+ if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil {
+ return
+ }
+
+ succeeded = true
+ return res, toPos, joinedRoomIDs, err
+}
+
+// CompleteSync returns a complete /sync API response for the given user.
+func (d *SyncServerDatasource) CompleteSync(
+ ctx context.Context, userID string, numRecentEventsPerRoom int,
+) (*types.Response, error) {
+ res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
+ ctx, userID, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Use a zero value SyncPosition for fromPos so all EDU states are added.
+ err = d.addEDUDeltaToResponse(
+ types.SyncPosition{}, toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+var txReadOnlySnapshot = sql.TxOptions{
+ // Set the isolation level so that we see a snapshot of the database.
+ // In PostgreSQL repeatable read transactions will see a snapshot taken
+ // at the first query, and since the transaction is read-only it can't
+ // run into any serialisation errors.
+ // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
+ Isolation: sql.LevelRepeatableRead,
+ ReadOnly: true,
+}
+
+// GetAccountDataInRange returns all account data for a given user inserted or
+// updated between two given positions
+// Returns a map following the format data[roomID] = []dataTypes
+// If no data is retrieved, returns an empty map
+// If there was an issue with the retrieval, returns an error
+func (d *SyncServerDatasource) GetAccountDataInRange(
+ ctx context.Context, userID string, oldPos, newPos int64,
+ accountDataFilterPart *gomatrixserverlib.FilterPart,
+) (map[string][]string, error) {
+ return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
+}
+
+// UpsertAccountData keeps track of new or updated account data, by saving the type
+// of the new/updated data, and the user ID and room ID the data is related to (empty)
+// room ID means the data isn't specific to any room)
+// If no data with the given type, user ID and room ID exists in the database,
+// creates a new row, else update the existing one
+// Returns an error if there was an issue with the upsert
+func (d *SyncServerDatasource) UpsertAccountData(
+ ctx context.Context, userID, roomID, dataType string,
+) (int64, error) {
+ return d.accountData.insertAccountData(ctx, userID, roomID, dataType)
+}
+
+// AddInviteEvent stores a new invite event for a user.
+// If the invite was successfully stored this returns the stream ID it was stored at.
+// Returns an error if there was a problem communicating with the database.
+func (d *SyncServerDatasource) AddInviteEvent(
+ ctx context.Context, inviteEvent gomatrixserverlib.Event,
+) (int64, error) {
+ return d.invites.insertInviteEvent(ctx, inviteEvent)
+}
+
+// RetireInviteEvent removes an old invite event from the database.
+// Returns an error if there was a problem communicating with the database.
+func (d *SyncServerDatasource) RetireInviteEvent(
+ ctx context.Context, inviteEventID string,
+) error {
+ // TODO: Record that invite has been retired in a stream so that we can
+ // notify the user in an incremental sync.
+ err := d.invites.deleteInviteEvent(ctx, inviteEventID)
+ return err
+}
+
+func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
+ d.typingCache.SetTimeoutCallback(fn)
+}
+
+// AddTypingUser adds a typing user to the typing cache.
+// Returns the newly calculated sync position for typing notifications.
+func (d *SyncServerDatasource) AddTypingUser(
+ userID, roomID string, expireTime *time.Time,
+) int64 {
+ return d.typingCache.AddTypingUser(userID, roomID, expireTime)
+}
+
+// RemoveTypingUser removes a typing user from the typing cache.
+// Returns the newly calculated sync position for typing notifications.
+func (d *SyncServerDatasource) RemoveTypingUser(
+ userID, roomID string,
+) int64 {
+ return d.typingCache.RemoveUser(userID, roomID)
+}
+
+func (d *SyncServerDatasource) addInvitesToResponse(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+ fromPos, toPos int64,
+ res *types.Response,
+) error {
+ invites, err := d.invites.selectInviteEventsInRange(
+ ctx, txn, userID, int64(fromPos), int64(toPos),
+ )
+ if err != nil {
+ return err
+ }
+ for roomID, inviteEvent := range invites {
+ ir := types.NewInviteResponse()
+ ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
+ []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
+ )
+ // TODO: add the invite state from the invite event.
+ res.Rooms.Invite[roomID] = *ir
+ }
+ return nil
+}
+
+// addRoomDeltaToResponse adds a room state delta to a sync response
+func (d *SyncServerDatasource) addRoomDeltaToResponse(
+ ctx context.Context,
+ device *authtypes.Device,
+ txn *sql.Tx,
+ fromPos, toPos int64,
+ delta stateDelta,
+ numRecentEventsPerRoom int,
+ res *types.Response,
+) error {
+ endPos := toPos
+ if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
+ // make sure we don't leak recent events after the leave event.
+ // TODO: History visibility makes this somewhat complex to handle correctly. For example:
+ // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
+ // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
+ // in a single /sync request
+ // This is all "okay" assuming history_visibility == "shared" which it is by default.
+ endPos = delta.membershipPos
+ }
+ recentStreamEvents, err := d.events.selectRecentEvents(
+ ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return err
+ }
+ recentEvents := streamEventsToEvents(device, recentStreamEvents)
+ delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
+
+ var prevPDUPos int64
+
+ if len(recentEvents) == 0 {
+ if len(delta.stateEvents) == 0 {
+ // Don't bother appending empty room entries
+ return nil
+ }
+
+ // If full_state=true and since is already up to date, then we'll have
+ // state events but no recent events.
+ prevPDUPos = toPos - 1
+ } else {
+ prevPDUPos = recentStreamEvents[0].streamPosition - 1
+ }
+
+ if prevPDUPos <= 0 {
+ prevPDUPos = 1
+ }
+
+ switch delta.membership {
+ case gomatrixserverlib.Join:
+ jr := types.NewJoinResponse()
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
+ jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[delta.roomID] = *jr
+ case gomatrixserverlib.Leave:
+ fallthrough // transitions to leave are the same as ban
+ case gomatrixserverlib.Ban:
+ // TODO: recentEvents may contain events that this user is not allowed to see because they are
+ // no longer in the room.
+ lr := types.NewLeaveResponse()
+ // Use the short form of batch token for prev_batch
+ lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
+ lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Leave[delta.roomID] = *lr
+ }
+
+ return nil
+}
+
+// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
+// Returns a map of room ID to list of events.
+func (d *SyncServerDatasource) fetchStateEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomIDToEventIDSet map[string]map[string]bool,
+ eventIDToEvent map[string]streamEvent,
+) (map[string][]streamEvent, error) {
+ stateBetween := make(map[string][]streamEvent)
+ missingEvents := make(map[string][]string)
+ for roomID, ids := range roomIDToEventIDSet {
+ events := stateBetween[roomID]
+ for id, need := range ids {
+ if !need {
+ continue // deleted state
+ }
+ e, ok := eventIDToEvent[id]
+ if ok {
+ events = append(events, e)
+ } else {
+ m := missingEvents[roomID]
+ m = append(m, id)
+ missingEvents[roomID] = m
+ }
+ }
+ stateBetween[roomID] = events
+ }
+
+ if len(missingEvents) > 0 {
+ // This happens when add_state_ids has an event ID which is not in the provided range.
+ // We need to explicitly fetch them.
+ allMissingEventIDs := []string{}
+ for _, missingEvIDs := range missingEvents {
+ allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
+ }
+ evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
+ if err != nil {
+ return nil, err
+ }
+ // we know we got them all otherwise an error would've been returned, so just loop the events
+ for _, ev := range evs {
+ roomID := ev.RoomID()
+ stateBetween[roomID] = append(stateBetween[roomID], ev)
+ }
+ }
+ return stateBetween, nil
+}
+
+func (d *SyncServerDatasource) fetchMissingStateEvents(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ // Fetch from the events table first so we pick up the stream ID for the
+ // event.
+ events, err := d.events.selectEvents(ctx, txn, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ have := map[string]bool{}
+ for _, event := range events {
+ have[event.EventID()] = true
+ }
+ var missing []string
+ for _, eventID := range eventIDs {
+ if !have[eventID] {
+ missing = append(missing, eventID)
+ }
+ }
+ if len(missing) == 0 {
+ return events, nil
+ }
+
+ // If they are missing from the events table then they should be state
+ // events that we received from outside the main event stream.
+ // These should be in the room state table.
+ stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing)
+
+ if err != nil {
+ return nil, err
+ }
+ if len(stateEvents) != len(missing) {
+ return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
+ }
+ events = append(events, stateEvents...)
+ return events, nil
+}
+
+// getStateDeltas returns the state deltas between fromPos and toPos,
+// exclusive of oldPos, inclusive of newPos, for the rooms in which
+// the user has new membership events.
+// A list of joined room IDs is also returned in case the caller needs it.
+func (d *SyncServerDatasource) getStateDeltas(
+ ctx context.Context, device *authtypes.Device, txn *sql.Tx,
+ fromPos, toPos int64, userID string,
+ stateFilterPart *gomatrixserverlib.FilterPart,
+) ([]stateDelta, []string, error) {
+ // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
+ // - Get membership list changes for this user in this sync response
+ // - For each room which has membership list changes:
+ // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
+ // If it is, then we need to send the full room state down (and 'limited' is always true).
+ // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
+ // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
+ // - Get all CURRENTLY joined rooms, and add them to 'joined' block.
+ var deltas []stateDelta
+
+ // get all the state events ever between these two positions
+ stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
+ if err != nil {
+ return nil, nil, err
+ }
+ state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ for roomID, stateStreamEvents := range state {
+ for _, ev := range stateStreamEvents {
+ // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
+ // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
+ // dupe join events will result in the entire room state coming down to the client again. This is added in
+ // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
+ // the timeline.
+ if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership == gomatrixserverlib.Join {
+ // send full room state down instead of a delta
+ var s []streamEvent
+ s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart)
+ if err != nil {
+ return nil, nil, err
+ }
+ state[roomID] = s
+ continue // we'll add this room in when we do joined rooms
+ }
+
+ deltas = append(deltas, stateDelta{
+ membership: membership,
+ membershipPos: ev.streamPosition,
+ stateEvents: streamEventsToEvents(device, stateStreamEvents),
+ roomID: roomID,
+ })
+ break
+ }
+ }
+ }
+
+ // Add in currently joined rooms
+ joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, joinedRoomID := range joinedRoomIDs {
+ deltas = append(deltas, stateDelta{
+ membership: gomatrixserverlib.Join,
+ stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
+ roomID: joinedRoomID,
+ })
+ }
+
+ return deltas, joinedRoomIDs, nil
+}
+
+// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
+// requests with full_state=true.
+// Fetches full state for all joined rooms and uses selectStateInRange to get
+// updates for other rooms.
+func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
+ ctx context.Context, device *authtypes.Device, txn *sql.Tx,
+ fromPos, toPos int64, userID string,
+ stateFilterPart *gomatrixserverlib.FilterPart,
+) ([]stateDelta, []string, error) {
+ joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Use a reasonable initial capacity
+ deltas := make([]stateDelta, 0, len(joinedRoomIDs))
+
+ // Add full states for all joined rooms
+ for _, joinedRoomID := range joinedRoomIDs {
+ s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart)
+ if stateErr != nil {
+ return nil, nil, stateErr
+ }
+ deltas = append(deltas, stateDelta{
+ membership: gomatrixserverlib.Join,
+ stateEvents: streamEventsToEvents(device, s),
+ roomID: joinedRoomID,
+ })
+ }
+
+ // Get all the state events ever between these two positions
+ stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
+ if err != nil {
+ return nil, nil, err
+ }
+ state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ for roomID, stateStreamEvents := range state {
+ for _, ev := range stateStreamEvents {
+ if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
+ deltas = append(deltas, stateDelta{
+ membership: membership,
+ membershipPos: ev.streamPosition,
+ stateEvents: streamEventsToEvents(device, stateStreamEvents),
+ roomID: roomID,
+ })
+ }
+
+ break
+ }
+ }
+ }
+
+ return deltas, joinedRoomIDs, nil
+}
+
+func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
+ ctx context.Context, txn *sql.Tx, roomID string,
+ stateFilterPart *gomatrixserverlib.FilterPart,
+) ([]streamEvent, error) {
+ allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
+ if err != nil {
+ return nil, err
+ }
+ s := make([]streamEvent, len(allState))
+ for i := 0; i < len(s); i++ {
+ s[i] = streamEvent{Event: allState[i], streamPosition: 0}
+ }
+ return s, nil
+}
+
+// streamEventsToEvents converts streamEvent to Event. If device is non-nil and
+// matches the streamevent.transactionID device then the transaction ID gets
+// added to the unsigned section of the output event.
+func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event {
+ out := make([]gomatrixserverlib.Event, len(in))
+ for i := 0; i < len(in); i++ {
+ out[i] = in[i].Event
+ if device != nil && in[i].transactionID != nil {
+ if device.UserID == in[i].Sender() && device.SessionID == in[i].transactionID.SessionID {
+ err := out[i].SetUnsignedField(
+ "transaction_id", in[i].transactionID.TransactionID,
+ )
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "event_id": out[i].EventID(),
+ }).WithError(err).Warnf("Failed to add transaction ID to event")
+ }
+ }
+ }
+ }
+ return out
+}
+
+// There may be some overlap where events in stateEvents are already in recentEvents, so filter
+// them out so we don't include them twice in the /sync response. They should be in recentEvents
+// only, so clients get to the correct state once they have rolled forward.
+func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
+ for _, recentEv := range recentEvents {
+ if recentEv.StateKey() == nil {
+ continue // not a state event
+ }
+ // TODO: This is a linear scan over all the current state events in this room. This will
+ // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
+ // then do a binary search to find matching events, similar to what roomserver does.
+ for j := 0; j < len(stateEvents); j++ {
+ if stateEvents[j].EventID() == recentEv.EventID() {
+ // overwrite the element to remove with the last element then pop the last element.
+ // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
+ // (we don't care about the order of stateEvents)
+ stateEvents[j] = stateEvents[len(stateEvents)-1]
+ stateEvents = stateEvents[:len(stateEvents)-1]
+ break // there shouldn't be multiple events with the same event ID
+ }
+ }
+ }
+ return stateEvents
+}
+
+// getMembershipFromEvent returns the value of content.membership iff the event is a state event
+// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
+func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
+ if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
+ membership, err := ev.Membership()
+ if err != nil {
+ return ""
+ }
+ return membership
+ }
+ return ""
+}