diff options
author | Kegsay <kegan@matrix.org> | 2020-06-26 11:07:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-26 11:07:52 +0100 |
commit | 4897beabeed3281f3e45a1426e6f1c9359e3152b (patch) | |
tree | 4746aecb87086d81de065ec30283c8ec48ff9f36 /syncapi | |
parent | c1d2382e6d7f459ddf911a16aac7d4e63d50838b (diff) |
Finish implementing retiring invites (#1166)
* Pass retired invites to the syncapi with the event ID of the invite
* Implement retire invite streaming
* Update whitelist
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 7 | ||||
-rw-r--r-- | syncapi/storage/interface.go | 4 | ||||
-rw-r--r-- | syncapi/storage/postgres/invites_table.go | 37 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 13 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/invites_table.go | 42 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 77 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 4 | ||||
-rw-r--r-- | syncapi/types/types.go | 4 |
8 files changed, 142 insertions, 46 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 98be5bb7..af7f612b 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -157,7 +157,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onRetireInviteEvent( ctx context.Context, msg api.OutputRetireInviteEvent, ) error { - err := s.db.RetireInviteEvent(ctx, msg.EventID) + sp, err := s.db.RetireInviteEvent(ctx, msg.EventID) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -166,8 +166,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( }).Panicf("roomserver output log: remove invite failure") return nil } - // TODO: Notify any active sync requests that the invite has been retired. - // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos) + // Notify any active sync requests that the invite has been retired. + // Invites share the same stream counter as PDUs + s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0)) return nil } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 7b3bd678..c693326b 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -78,9 +78,9 @@ type Database interface { // If the invite was successfully stored this returns the stream ID it was stored at. // Returns an error if there was a problem communicating with the database. AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error) - // RetireInviteEvent removes an old invite event from the database. + // RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite. // Returns an error if there was a problem communicating with the database. - RetireInviteEvent(ctx context.Context, inviteEventID string) error + RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error) // SetTypingTimeoutCallback sets a callback function that is called right after // a user is removed from the typing user list due to timeout. SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 5031d64e..530dc645 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -33,7 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events ( event_id TEXT NOT NULL, room_id TEXT NOT NULL, target_user_id TEXT NOT NULL, - headered_event_json TEXT NOT NULL + headered_event_json TEXT NOT NULL, + deleted BOOL NOT NULL ); -- For looking up the invites for a given user. @@ -47,14 +48,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events (" + - " room_id, event_id, target_user_id, headered_event_json" + - ") VALUES ($1, $2, $3, $4) RETURNING id" + " room_id, event_id, target_user_id, headered_event_json, deleted" + + ") VALUES ($1, $2, $3, $4, FALSE) RETURNING id" const deleteInviteEventSQL = "" + - "DELETE FROM syncapi_invite_events WHERE event_id = $1" + "UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE event_id = $1 RETURNING id" const selectInviteEventsInRangeSQL = "" + - "SELECT room_id, headered_event_json FROM syncapi_invite_events" + + "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC" @@ -110,40 +111,46 @@ func (s *inviteEventsStatements) InsertInviteEvent( func (s *inviteEventsStatements) DeleteInviteEvent( ctx context.Context, inviteEventID string, -) error { - _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) - return err +) (sp types.StreamPosition, err error) { + err = s.deleteInviteEventStmt.QueryRowContext(ctx, inviteEventID).Scan(&sp) + return } // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, -) (map[string]gomatrixserverlib.HeaderedEvent, error) { +) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt) rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) if err != nil { - return nil, err + return nil, nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") result := map[string]gomatrixserverlib.HeaderedEvent{} + retired := map[string]gomatrixserverlib.HeaderedEvent{} for rows.Next() { var ( roomID string eventJSON []byte + deleted bool ) - if err = rows.Scan(&roomID, &eventJSON); err != nil { - return nil, err + if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil { + return nil, nil, err } var event gomatrixserverlib.HeaderedEvent if err := json.Unmarshal(eventJSON, &event); err != nil { - return nil, err + return nil, nil, err } - result[roomID] = event + if deleted { + retired[roomID] = event + } else { + result[roomID] = event + } } - return result, rows.Err() + return result, retired, rows.Err() } func (s *inviteEventsStatements) SelectMaxInviteID( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 74ae3eab..f84dc341 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -180,11 +180,8 @@ func (d *Database) AddInviteEvent( // Returns an error if there was a problem communicating with the database. func (d *Database) RetireInviteEvent( ctx context.Context, inviteEventID string, -) error { - // TODO: Record that invite has been retired in a stream so that we can - // notify the user in an incremental sync. - err := d.Invites.DeleteInviteEvent(ctx, inviteEventID) - return err +) (types.StreamPosition, error) { + return d.Invites.DeleteInviteEvent(ctx, inviteEventID) } // GetAccountDataInRange returns all account data for a given user inserted or @@ -724,7 +721,7 @@ func (d *Database) addInvitesToResponse( r types.Range, res *types.Response, ) error { - invites, err := d.Invites.SelectInviteEventsInRange( + invites, retiredInvites, err := d.Invites.SelectInviteEventsInRange( ctx, txn, userID, r, ) if err != nil { @@ -734,6 +731,10 @@ func (d *Database) addInvitesToResponse( ir := types.NewInviteResponse(inviteEvent) res.Rooms.Invite[roomID] = *ir } + for roomID := range retiredInvites { + lr := types.NewLeaveResponse() + res.Rooms.Leave[roomID] = *lr + } return nil } diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index bb58e345..aa051388 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -33,7 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events ( event_id TEXT NOT NULL, room_id TEXT NOT NULL, target_user_id TEXT NOT NULL, - headered_event_json TEXT NOT NULL + headered_event_json TEXT NOT NULL, + deleted BOOL NOT NULL ); CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx ON syncapi_invite_events (target_user_id, id); @@ -42,14 +43,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events" + - " (id, room_id, event_id, target_user_id, headered_event_json)" + - " VALUES ($1, $2, $3, $4, $5)" + " (id, room_id, event_id, target_user_id, headered_event_json, deleted)" + + " VALUES ($1, $2, $3, $4, $5, false)" const deleteInviteEventSQL = "" + - "DELETE FROM syncapi_invite_events WHERE event_id = $1" + "UPDATE syncapi_invite_events SET deleted=true, id=$1 WHERE event_id = $2" const selectInviteEventsInRangeSQL = "" + - "SELECT room_id, headered_event_json FROM syncapi_invite_events" + + "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC" @@ -114,40 +115,49 @@ func (s *inviteEventsStatements) InsertInviteEvent( func (s *inviteEventsStatements) DeleteInviteEvent( ctx context.Context, inviteEventID string, -) error { - _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) - return err +) (types.StreamPosition, error) { + streamPos, err := s.streamIDStatements.nextStreamID(ctx, nil) + if err != nil { + return streamPos, err + } + _, err = s.deleteInviteEventStmt.ExecContext(ctx, streamPos, inviteEventID) + return streamPos, err } // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, -) (map[string]gomatrixserverlib.HeaderedEvent, error) { +) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt) rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) if err != nil { - return nil, err + return nil, nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") result := map[string]gomatrixserverlib.HeaderedEvent{} + retired := map[string]gomatrixserverlib.HeaderedEvent{} for rows.Next() { var ( roomID string eventJSON []byte + deleted bool ) - if err = rows.Scan(&roomID, &eventJSON); err != nil { - return nil, err + if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil { + return nil, nil, err } var event gomatrixserverlib.HeaderedEvent if err := json.Unmarshal(eventJSON, &event); err != nil { - return nil, err + return nil, nil, err + } + if deleted { + retired[roomID] = event + } else { + result[roomID] = event } - - result[roomID] = event } - return result, nil + return result, retired, nil } func (s *inviteEventsStatements) SelectMaxInviteID( diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 85084fac..feacbc18 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -601,6 +601,83 @@ func TestSendToDeviceBehaviour(t *testing.T) { } } +func TestInviteBehaviour(t *testing.T) { + db := MustCreateDatabase(t) + inviteRoom1 := "!inviteRoom1:somewhere" + inviteEvent1 := MustCreateEvent(t, inviteRoom1, nil, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"membership":"invite"}`)), + Type: "m.room.member", + StateKey: &testUserIDA, + Sender: "@inviteUser1:somewhere", + }) + inviteRoom2 := "!inviteRoom2:somewhere" + inviteEvent2 := MustCreateEvent(t, inviteRoom2, nil, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"membership":"invite"}`)), + Type: "m.room.member", + StateKey: &testUserIDA, + Sender: "@inviteUser2:somewhere", + }) + for _, ev := range []gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} { + _, err := db.AddInviteEvent(ctx, ev) + if err != nil { + t.Fatalf("Failed to AddInviteEvent: %s", err) + } + } + latest, err := db.SyncPosition(ctx) + if err != nil { + t.Fatalf("failed to get SyncPosition: %s", err) + } + // both invite events should appear in a new sync + beforeRetireRes := types.NewResponse() + beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false) + if err != nil { + t.Fatalf("IncrementalSync failed: %s", err) + } + assertInvitedToRooms(t, beforeRetireRes, []string{inviteRoom1, inviteRoom2}) + + // retire one event: a fresh sync should just return 1 invite room + if _, err = db.RetireInviteEvent(ctx, inviteEvent1.EventID()); err != nil { + t.Fatalf("Failed to RetireInviteEvent: %s", err) + } + latest, err = db.SyncPosition(ctx) + if err != nil { + t.Fatalf("failed to get SyncPosition: %s", err) + } + res := types.NewResponse() + res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false) + if err != nil { + t.Fatalf("IncrementalSync failed: %s", err) + } + assertInvitedToRooms(t, res, []string{inviteRoom2}) + + // a sync after we have received both invites should result in a leave for the retired room + beforeRetireTok, err := types.NewStreamTokenFromString(beforeRetireRes.NextBatch) + if err != nil { + t.Fatalf("NewStreamTokenFromString cannot parse next batch '%s' : %s", beforeRetireRes.NextBatch, err) + } + res = types.NewResponse() + res, err = db.IncrementalSync(ctx, res, testUserDeviceA, beforeRetireTok, latest, 0, false) + if err != nil { + t.Fatalf("IncrementalSync failed: %s", err) + } + assertInvitedToRooms(t, res, []string{}) + if _, ok := res.Rooms.Leave[inviteRoom1]; !ok { + t.Fatalf("IncrementalSync: expected to see room left after it was retired but it wasn't") + } +} + +func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) { + t.Helper() + if len(res.Rooms.Invite) != len(roomIDs) { + t.Fatalf("got %d invited rooms, want %d", len(res.Rooms.Invite), len(roomIDs)) + } + for _, roomID := range roomIDs { + if _, ok := res.Rooms.Invite[roomID]; !ok { + t.Fatalf("missing room ID %s", roomID) + } + } +} + func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) { if len(gots) != len(wants) { t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants)) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 0b7d1595..246dc695 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -32,9 +32,9 @@ type AccountData interface { type Invites interface { InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) - DeleteInviteEvent(ctx context.Context, inviteEventID string) error + DeleteInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error) // SelectInviteEventsInRange returns a map of room ID to invite events. - SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]gomatrixserverlib.HeaderedEvent, error) + SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]gomatrixserverlib.HeaderedEvent, retired map[string]gomatrixserverlib.HeaderedEvent, err error) SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 1094416a..019f2e69 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -290,10 +290,10 @@ type Response struct { NextBatch string `json:"next_batch"` AccountData struct { Events []gomatrixserverlib.ClientEvent `json:"events"` - } `json:"account_data"` + } `json:"account_data,omitempty"` Presence struct { Events []gomatrixserverlib.ClientEvent `json:"events"` - } `json:"presence"` + } `json:"presence,omitempty"` Rooms struct { Join map[string]JoinResponse `json:"join"` Invite map[string]InviteResponse `json:"invite"` |