diff options
author | Kegsay <kegan@matrix.org> | 2020-07-08 17:45:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-08 17:45:39 +0100 |
commit | d9648b0615f3a7b1d8a824777783f19fa46697f4 (patch) | |
tree | ef31525f671b96e8e765fbb3cd3f325d8a9ee8a7 /syncapi | |
parent | a5a51b41416e9b1d8084dbc759dff735133817fa (diff) |
Finish implementing redactions (#1189)
* Add a bit more logging to the fedsender
* bugfix: continue sending PDUs if ones are added whilst sending another PDU
Without this, the queue goes back to sleep on `<-oq.notifyPDUs` which won't
fire because `pendingPDUs` is already > 0. This should fix a flakey sytest.
* Break if no txn is sent
* WIP syncapi work
* More debugging
* Bump GMSL version to pull in working Event.Redact
* Remove logging
* Make redactions work on v3+
* Fix more tests
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 34 | ||||
-rw-r--r-- | syncapi/storage/interface.go | 2 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 16 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 21 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 16 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 1 |
6 files changed, 85 insertions, 5 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index af7f612b..c6502716 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -81,11 +81,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { switch output.Type { case api.OutputTypeNewRoomEvent: + // Ignore redaction events. We will add them to the database when they are + // validated (when we receive OutputTypeRedactedEvent) + event := output.NewRoomEvent.Event + if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { + // in the special case where the event redacts itself, just pass the message through because + // we will never see the other part of the pair + if event.Redacts() != event.EventID() { + return nil + } + } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + case api.OutputTypeRedactedEvent: + return s.onRedactEvent(context.TODO(), *output.RedactedEvent) default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -94,11 +106,25 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } +func (s *OutputRoomEventConsumer) onRedactEvent( + ctx context.Context, msg api.OutputRedactedEvent, +) error { + err := s.db.RedactEvent(ctx, msg.RedactedEventID, &msg.RedactedBecause) + if err != nil { + log.WithError(err).Error("RedactEvent error'd") + return err + } + // fake a room event so we notify clients about the redaction, as if it were + // a normal event. + return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{ + Event: msg.RedactedBecause, + }) +} + func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { ev := msg.Event - addsStateEvents := msg.AddsState() ev, err := s.updateStateEvent(ev) @@ -173,12 +199,10 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { - var stateKey string if event.StateKey() == nil { - stateKey = "" - } else { - stateKey = *event.StateKey() + return event, nil } + stateKey := *event.StateKey() prevEvent, err := s.db.GetStateEvent( context.TODO(), event.RoomID(), event.Type(), stateKey, diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index c4dae4d0..a5e13b67 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -136,4 +136,6 @@ type Database interface { // Returns the filterID as a string. Otherwise returns an error if something // goes wrong. PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) + // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event + RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index c7c4dc63..5315de24 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -99,6 +99,9 @@ const selectEarlyEventsSQL = "" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" +const updateEventJSONSQL = "" + + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + @@ -120,6 +123,7 @@ type outputRoomEventsStatements struct { selectRecentEventsForSyncStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -149,9 +153,21 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return nil, err } + if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { + return nil, err + } return s, nil } +func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { + headeredJSON, err := json.Marshal(event) + if err != nil { + return err + } + _, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID()) + return err +} + // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 01362ddd..38b503cd 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -24,6 +24,7 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -597,6 +598,26 @@ func (d *Database) IncrementalSync( return res, nil } +func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error { + redactedEvents, err := d.Events(ctx, []string{redactedEventID}) + if err != nil { + return err + } + if len(redactedEvents) == 0 { + logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction") + return nil + } + eventToRedact := redactedEvents[0].Unwrap() + redactionEvent := redactedBecause.Unwrap() + ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact) + if err != nil { + return err + } + + newEvent := ev.Headered(redactedBecause.RoomVersion) + return d.OutputEvents.UpdateEventJSON(ctx, &newEvent) +} + // getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed // to it. It returns toPos and joinedRoomIDs for use of adding EDUs. // nolint:nakedret diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 0c909cc4..da2ea3f6 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -76,6 +76,9 @@ const selectEarlyEventsSQL = "" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" +const updateEventJSONSQL = "" + + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). /* $1 = oldPos, @@ -109,6 +112,7 @@ type outputRoomEventsStatements struct { selectRecentEventsForSyncStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { @@ -140,9 +144,21 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return nil, err } + if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { + return nil, err + } return s, nil } +func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { + headeredJSON, err := json.Marshal(event) + if err != nil { + return err + } + _, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID()) + return err +} + // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 4ac0be4e..9d239d23 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -49,6 +49,7 @@ type Events interface { // SelectEarlyEvents returns the earliest events in the given room. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) + UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error } // Topology keeps track of the depths and stream positions for all events. |