aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-26 11:07:52 +0100
committerGitHub <noreply@github.com>2020-06-26 11:07:52 +0100
commit4897beabeed3281f3e45a1426e6f1c9359e3152b (patch)
tree4746aecb87086d81de065ec30283c8ec48ff9f36 /syncapi
parentc1d2382e6d7f459ddf911a16aac7d4e63d50838b (diff)
Finish implementing retiring invites (#1166)
* Pass retired invites to the syncapi with the event ID of the invite * Implement retire invite streaming * Update whitelist
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go7
-rw-r--r--syncapi/storage/interface.go4
-rw-r--r--syncapi/storage/postgres/invites_table.go37
-rw-r--r--syncapi/storage/shared/syncserver.go13
-rw-r--r--syncapi/storage/sqlite3/invites_table.go42
-rw-r--r--syncapi/storage/storage_test.go77
-rw-r--r--syncapi/storage/tables/interface.go4
-rw-r--r--syncapi/types/types.go4
8 files changed, 142 insertions, 46 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 98be5bb7..af7f612b 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -157,7 +157,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) error {
- err := s.db.RetireInviteEvent(ctx, msg.EventID)
+ sp, err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@@ -166,8 +166,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}).Panicf("roomserver output log: remove invite failure")
return nil
}
- // TODO: Notify any active sync requests that the invite has been retired.
- // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos)
+ // Notify any active sync requests that the invite has been retired.
+ // Invites share the same stream counter as PDUs
+ s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0))
return nil
}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 7b3bd678..c693326b 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -78,9 +78,9 @@ type Database interface {
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
- // RetireInviteEvent removes an old invite event from the database.
+ // RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
// Returns an error if there was a problem communicating with the database.
- RetireInviteEvent(ctx context.Context, inviteEventID string) error
+ RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
// SetTypingTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 5031d64e..530dc645 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -33,7 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
- headered_event_json TEXT NOT NULL
+ headered_event_json TEXT NOT NULL,
+ deleted BOOL NOT NULL
);
-- For looking up the invites for a given user.
@@ -47,14 +48,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
const insertInviteEventSQL = "" +
"INSERT INTO syncapi_invite_events (" +
- " room_id, event_id, target_user_id, headered_event_json" +
- ") VALUES ($1, $2, $3, $4) RETURNING id"
+ " room_id, event_id, target_user_id, headered_event_json, deleted" +
+ ") VALUES ($1, $2, $3, $4, FALSE) RETURNING id"
const deleteInviteEventSQL = "" +
- "DELETE FROM syncapi_invite_events WHERE event_id = $1"
+ "UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE event_id = $1 RETURNING id"
const selectInviteEventsInRangeSQL = "" +
- "SELECT room_id, headered_event_json FROM syncapi_invite_events" +
+ "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@@ -110,40 +111,46 @@ func (s *inviteEventsStatements) InsertInviteEvent(
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, inviteEventID string,
-) error {
- _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID)
- return err
+) (sp types.StreamPosition, err error) {
+ err = s.deleteInviteEventStmt.QueryRowContext(ctx, inviteEventID).Scan(&sp)
+ return
}
// selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]gomatrixserverlib.HeaderedEvent, error) {
+) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
- return nil, err
+ return nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.HeaderedEvent{}
+ retired := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
eventJSON []byte
+ deleted bool
)
- if err = rows.Scan(&roomID, &eventJSON); err != nil {
- return nil, err
+ if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil {
+ return nil, nil, err
}
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
- return nil, err
+ return nil, nil, err
}
- result[roomID] = event
+ if deleted {
+ retired[roomID] = event
+ } else {
+ result[roomID] = event
+ }
}
- return result, rows.Err()
+ return result, retired, rows.Err()
}
func (s *inviteEventsStatements) SelectMaxInviteID(
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 74ae3eab..f84dc341 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -180,11 +180,8 @@ func (d *Database) AddInviteEvent(
// Returns an error if there was a problem communicating with the database.
func (d *Database) RetireInviteEvent(
ctx context.Context, inviteEventID string,
-) error {
- // TODO: Record that invite has been retired in a stream so that we can
- // notify the user in an incremental sync.
- err := d.Invites.DeleteInviteEvent(ctx, inviteEventID)
- return err
+) (types.StreamPosition, error) {
+ return d.Invites.DeleteInviteEvent(ctx, inviteEventID)
}
// GetAccountDataInRange returns all account data for a given user inserted or
@@ -724,7 +721,7 @@ func (d *Database) addInvitesToResponse(
r types.Range,
res *types.Response,
) error {
- invites, err := d.Invites.SelectInviteEventsInRange(
+ invites, retiredInvites, err := d.Invites.SelectInviteEventsInRange(
ctx, txn, userID, r,
)
if err != nil {
@@ -734,6 +731,10 @@ func (d *Database) addInvitesToResponse(
ir := types.NewInviteResponse(inviteEvent)
res.Rooms.Invite[roomID] = *ir
}
+ for roomID := range retiredInvites {
+ lr := types.NewLeaveResponse()
+ res.Rooms.Leave[roomID] = *lr
+ }
return nil
}
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index bb58e345..aa051388 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -33,7 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
- headered_event_json TEXT NOT NULL
+ headered_event_json TEXT NOT NULL,
+ deleted BOOL NOT NULL
);
CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx ON syncapi_invite_events (target_user_id, id);
@@ -42,14 +43,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events
const insertInviteEventSQL = "" +
"INSERT INTO syncapi_invite_events" +
- " (id, room_id, event_id, target_user_id, headered_event_json)" +
- " VALUES ($1, $2, $3, $4, $5)"
+ " (id, room_id, event_id, target_user_id, headered_event_json, deleted)" +
+ " VALUES ($1, $2, $3, $4, $5, false)"
const deleteInviteEventSQL = "" +
- "DELETE FROM syncapi_invite_events WHERE event_id = $1"
+ "UPDATE syncapi_invite_events SET deleted=true, id=$1 WHERE event_id = $2"
const selectInviteEventsInRangeSQL = "" +
- "SELECT room_id, headered_event_json FROM syncapi_invite_events" +
+ "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@@ -114,40 +115,49 @@ func (s *inviteEventsStatements) InsertInviteEvent(
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, inviteEventID string,
-) error {
- _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID)
- return err
+) (types.StreamPosition, error) {
+ streamPos, err := s.streamIDStatements.nextStreamID(ctx, nil)
+ if err != nil {
+ return streamPos, err
+ }
+ _, err = s.deleteInviteEventStmt.ExecContext(ctx, streamPos, inviteEventID)
+ return streamPos, err
}
// selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]gomatrixserverlib.HeaderedEvent, error) {
+) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
- return nil, err
+ return nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.HeaderedEvent{}
+ retired := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
eventJSON []byte
+ deleted bool
)
- if err = rows.Scan(&roomID, &eventJSON); err != nil {
- return nil, err
+ if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil {
+ return nil, nil, err
}
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
- return nil, err
+ return nil, nil, err
+ }
+ if deleted {
+ retired[roomID] = event
+ } else {
+ result[roomID] = event
}
-
- result[roomID] = event
}
- return result, nil
+ return result, retired, nil
}
func (s *inviteEventsStatements) SelectMaxInviteID(
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 85084fac..feacbc18 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -601,6 +601,83 @@ func TestSendToDeviceBehaviour(t *testing.T) {
}
}
+func TestInviteBehaviour(t *testing.T) {
+ db := MustCreateDatabase(t)
+ inviteRoom1 := "!inviteRoom1:somewhere"
+ inviteEvent1 := MustCreateEvent(t, inviteRoom1, nil, &gomatrixserverlib.EventBuilder{
+ Content: []byte(fmt.Sprintf(`{"membership":"invite"}`)),
+ Type: "m.room.member",
+ StateKey: &testUserIDA,
+ Sender: "@inviteUser1:somewhere",
+ })
+ inviteRoom2 := "!inviteRoom2:somewhere"
+ inviteEvent2 := MustCreateEvent(t, inviteRoom2, nil, &gomatrixserverlib.EventBuilder{
+ Content: []byte(fmt.Sprintf(`{"membership":"invite"}`)),
+ Type: "m.room.member",
+ StateKey: &testUserIDA,
+ Sender: "@inviteUser2:somewhere",
+ })
+ for _, ev := range []gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
+ _, err := db.AddInviteEvent(ctx, ev)
+ if err != nil {
+ t.Fatalf("Failed to AddInviteEvent: %s", err)
+ }
+ }
+ latest, err := db.SyncPosition(ctx)
+ if err != nil {
+ t.Fatalf("failed to get SyncPosition: %s", err)
+ }
+ // both invite events should appear in a new sync
+ beforeRetireRes := types.NewResponse()
+ beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false)
+ if err != nil {
+ t.Fatalf("IncrementalSync failed: %s", err)
+ }
+ assertInvitedToRooms(t, beforeRetireRes, []string{inviteRoom1, inviteRoom2})
+
+ // retire one event: a fresh sync should just return 1 invite room
+ if _, err = db.RetireInviteEvent(ctx, inviteEvent1.EventID()); err != nil {
+ t.Fatalf("Failed to RetireInviteEvent: %s", err)
+ }
+ latest, err = db.SyncPosition(ctx)
+ if err != nil {
+ t.Fatalf("failed to get SyncPosition: %s", err)
+ }
+ res := types.NewResponse()
+ res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false)
+ if err != nil {
+ t.Fatalf("IncrementalSync failed: %s", err)
+ }
+ assertInvitedToRooms(t, res, []string{inviteRoom2})
+
+ // a sync after we have received both invites should result in a leave for the retired room
+ beforeRetireTok, err := types.NewStreamTokenFromString(beforeRetireRes.NextBatch)
+ if err != nil {
+ t.Fatalf("NewStreamTokenFromString cannot parse next batch '%s' : %s", beforeRetireRes.NextBatch, err)
+ }
+ res = types.NewResponse()
+ res, err = db.IncrementalSync(ctx, res, testUserDeviceA, beforeRetireTok, latest, 0, false)
+ if err != nil {
+ t.Fatalf("IncrementalSync failed: %s", err)
+ }
+ assertInvitedToRooms(t, res, []string{})
+ if _, ok := res.Rooms.Leave[inviteRoom1]; !ok {
+ t.Fatalf("IncrementalSync: expected to see room left after it was retired but it wasn't")
+ }
+}
+
+func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) {
+ t.Helper()
+ if len(res.Rooms.Invite) != len(roomIDs) {
+ t.Fatalf("got %d invited rooms, want %d", len(res.Rooms.Invite), len(roomIDs))
+ }
+ for _, roomID := range roomIDs {
+ if _, ok := res.Rooms.Invite[roomID]; !ok {
+ t.Fatalf("missing room ID %s", roomID)
+ }
+ }
+}
+
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) {
if len(gots) != len(wants) {
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 0b7d1595..246dc695 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -32,9 +32,9 @@ type AccountData interface {
type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
- DeleteInviteEvent(ctx context.Context, inviteEventID string) error
+ DeleteInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events.
- SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]gomatrixserverlib.HeaderedEvent, error)
+ SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]gomatrixserverlib.HeaderedEvent, retired map[string]gomatrixserverlib.HeaderedEvent, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 1094416a..019f2e69 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -290,10 +290,10 @@ type Response struct {
NextBatch string `json:"next_batch"`
AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
- } `json:"account_data"`
+ } `json:"account_data,omitempty"`
Presence struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
- } `json:"presence"`
+ } `json:"presence,omitempty"`
Rooms struct {
Join map[string]JoinResponse `json:"join"`
Invite map[string]InviteResponse `json:"invite"`