aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-08 17:45:39 +0100
committerGitHub <noreply@github.com>2020-07-08 17:45:39 +0100
commitd9648b0615f3a7b1d8a824777783f19fa46697f4 (patch)
treeef31525f671b96e8e765fbb3cd3f325d8a9ee8a7
parenta5a51b41416e9b1d8084dbc759dff735133817fa (diff)
Finish implementing redactions (#1189)
* Add a bit more logging to the fedsender * bugfix: continue sending PDUs if ones are added whilst sending another PDU Without this, the queue goes back to sleep on `<-oq.notifyPDUs` which won't fire because `pendingPDUs` is already > 0. This should fix a flakey sytest. * Break if no txn is sent * WIP syncapi work * More debugging * Bump GMSL version to pull in working Event.Redact * Remove logging * Make redactions work on v3+ * Fix more tests
-rw-r--r--currentstateserver/storage/shared/storage.go6
-rw-r--r--go.mod2
-rw-r--r--go.sum6
-rw-r--r--internal/eventutil/events.go5
-rw-r--r--roomserver/api/output.go16
-rw-r--r--roomserver/internal/query.go11
-rw-r--r--roomserver/storage/shared/storage.go4
-rw-r--r--syncapi/consumers/roomserver.go34
-rw-r--r--syncapi/storage/interface.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go16
-rw-r--r--syncapi/storage/shared/syncserver.go21
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go16
-rw-r--r--syncapi/storage/tables/interface.go1
-rw-r--r--sytest-whitelist16
14 files changed, 131 insertions, 25 deletions
diff --git a/currentstateserver/storage/shared/storage.go b/currentstateserver/storage/shared/storage.go
index 362dafe9..66b979d8 100644
--- a/currentstateserver/storage/shared/storage.go
+++ b/currentstateserver/storage/shared/storage.go
@@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
)
type Database struct {
@@ -45,10 +44,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err
}
if len(events) != 1 {
- // this should never happen but is non-fatal
- util.GetLogger(ctx).WithField("redacted_event_id", redactedEventID).WithField("redaction_event_id", redactedBecause.EventID()).Warnf(
- "RedactEvent: missing redacted event",
- )
+ // this will happen for all non-state events
return nil
}
redactionEvent := redactedBecause.Unwrap()
diff --git a/go.mod b/go.mod
index b007b09a..5c896a37 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
- github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b
+ github.com/matrix-org/gomatrixserverlib v0.0.0-20200708152912-d034ccb75e2d
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible
diff --git a/go.sum b/go.sum
index 2b4644b6..1fd01ce9 100644
--- a/go.sum
+++ b/go.sum
@@ -422,8 +422,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b h1:g1ueoPHI5tpafw/QysVfDw43FwRTPqz8sT+MZbK54yk=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200708143827-8bfb7222929b h1:mHFK2pcy+fhettE42aoq+JTcj3ysqFqkQLlUpC33Fwg=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200708143827-8bfb7222929b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200708152912-d034ccb75e2d h1:v3MGEwCLlFjzZcYJu+aI3kUoNxQyCM3DUurjJFlaI04=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200708152912-d034ccb75e2d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go
index e3b8f3d3..1e3afac8 100644
--- a/internal/eventutil/events.go
+++ b/internal/eventutil/events.go
@@ -162,5 +162,10 @@ func RedactEvent(redactionEvent, redactedEvent *gomatrixserverlib.Event) (*gomat
if err != nil {
return nil, err
}
+ // NOTSPEC: sytest relies on this unspecced field existing :(
+ err = r.SetUnsignedField("redacted_by", redactionEvent.EventID())
+ if err != nil {
+ return nil, err
+ }
return &r, nil
}
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index b25353ae..d6c09f9e 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -29,6 +29,22 @@ const (
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
OutputTypeRetireInviteEvent OutputType = "retire_invite_event"
// OutputTypeRedactedEvent indicates that the event is an OutputRedactedEvent
+ //
+ // This event is emitted when a redaction has been 'validated' (meaning both the redaction and the event to redact are known).
+ // Redaction validation happens when the roomserver receives either:
+ // - A redaction for which we have the event to redact.
+ // - Any event for which we have a redaction.
+ // When the roomserver receives an event, it will check against the redactions table to see if there is a matching redaction
+ // for the event. If there is, it will mark the redaction as validated and emit this event. In the common case of a redaction
+ // happening after receiving the event to redact, the roomserver will emit a OutputTypeNewRoomEvent of m.room.redaction
+ // immediately followed by a OutputTypeRedactedEvent. In the uncommon case of receiving the redaction BEFORE the event to redact,
+ // the roomserver will emit a OutputTypeNewRoomEvent of the event to redact immediately followed by a OutputTypeRedactedEvent.
+ //
+ // In order to honour redactions correctly, downstream components must ignore m.room.redaction events emitted via OutputTypeNewRoomEvent.
+ // When downstream components receive an OutputTypeRedactedEvent they must:
+ // - Pull out the event to redact from the database. They should have this because the redaction is validated.
+ // - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted.
+ // - Replace the event in the database.
OutputTypeRedactedEvent OutputType = "redacted_event"
)
diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go
index ca4af0b2..bede6c88 100644
--- a/roomserver/internal/query.go
+++ b/roomserver/internal/query.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
+ "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth"
"github.com/matrix-org/dendrite/roomserver/state"
@@ -867,7 +868,7 @@ func getAuthChain(
func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
var roomNID types.RoomNID
backfilledEventMap := make(map[string]types.Event)
- for _, ev := range events {
+ for j, ev := range events {
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
if err != nil { // this shouldn't happen as RequestBackfill already found them
logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
@@ -891,12 +892,14 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse
// It's also possible for this event to be a redaction which results in another event being
// redacted, which we don't care about since we aren't returning it in this backfill.
if redactedEventID == ev.EventID() {
- ev = ev.Redact().Headered(ev.RoomVersion)
- err = ev.SetUnsignedField("redacted_because", redactionEvent)
+ eventToRedact := ev.Unwrap()
+ redactedEvent, err := eventutil.RedactEvent(redactionEvent, &eventToRedact)
if err != nil {
- logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set unsigned field")
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event")
continue
}
+ ev = redactedEvent.Headered(ev.RoomVersion)
+ events[j] = ev
}
backfilledEventMap[ev.EventID()] = types.Event{
EventNID: stateAtEvent.StateEntry.EventNID,
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index a9cb5782..e2e5daf9 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -563,6 +563,10 @@ func (d *Database) handleRedactions(
// we've seen this redaction before or there is nothing to redact
return nil, "", nil
}
+ if redactedEvent.RoomID() != redactionEvent.RoomID() {
+ // redactions across rooms aren't allowed
+ return nil, "", nil
+ }
// mark the event as redacted
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index af7f612b..c6502716 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -81,11 +81,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
switch output.Type {
case api.OutputTypeNewRoomEvent:
+ // Ignore redaction events. We will add them to the database when they are
+ // validated (when we receive OutputTypeRedactedEvent)
+ event := output.NewRoomEvent.Event
+ if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
+ // in the special case where the event redacts itself, just pass the message through because
+ // we will never see the other part of the pair
+ if event.Redacts() != event.EventID() {
+ return nil
+ }
+ }
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
+ case api.OutputTypeRedactedEvent:
+ return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@@ -94,11 +106,25 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
+func (s *OutputRoomEventConsumer) onRedactEvent(
+ ctx context.Context, msg api.OutputRedactedEvent,
+) error {
+ err := s.db.RedactEvent(ctx, msg.RedactedEventID, &msg.RedactedBecause)
+ if err != nil {
+ log.WithError(err).Error("RedactEvent error'd")
+ return err
+ }
+ // fake a room event so we notify clients about the redaction, as if it were
+ // a normal event.
+ return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{
+ Event: msg.RedactedBecause,
+ })
+}
+
func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
-
addsStateEvents := msg.AddsState()
ev, err := s.updateStateEvent(ev)
@@ -173,12 +199,10 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
- var stateKey string
if event.StateKey() == nil {
- stateKey = ""
- } else {
- stateKey = *event.StateKey()
+ return event, nil
}
+ stateKey := *event.StateKey()
prevEvent, err := s.db.GetStateEvent(
context.TODO(), event.RoomID(), event.Type(), stateKey,
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index c4dae4d0..a5e13b67 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -136,4 +136,6 @@ type Database interface {
// Returns the filterID as a string. Otherwise returns an error if something
// goes wrong.
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
+ // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
+ RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index c7c4dc63..5315de24 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -99,6 +99,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
+const updateEventJSONSQL = "" +
+ "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
+
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
@@ -120,6 +123,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
+ updateEventJSONStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@@ -149,9 +153,21 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
+ if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
+ return nil, err
+ }
return s, nil
}
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
+ headeredJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+ _, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
+ return err
+}
+
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 01362ddd..38b503cd 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -24,6 +24,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
@@ -597,6 +598,26 @@ func (d *Database) IncrementalSync(
return res, nil
}
+func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
+ redactedEvents, err := d.Events(ctx, []string{redactedEventID})
+ if err != nil {
+ return err
+ }
+ if len(redactedEvents) == 0 {
+ logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
+ return nil
+ }
+ eventToRedact := redactedEvents[0].Unwrap()
+ redactionEvent := redactedBecause.Unwrap()
+ ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact)
+ if err != nil {
+ return err
+ }
+
+ newEvent := ev.Headered(redactedBecause.RoomVersion)
+ return d.OutputEvents.UpdateEventJSON(ctx, &newEvent)
+}
+
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
// nolint:nakedret
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 0c909cc4..da2ea3f6 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -76,6 +76,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
+const updateEventJSONSQL = "" +
+ "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
+
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
/*
$1 = oldPos,
@@ -109,6 +112,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
+ updateEventJSONStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@@ -140,9 +144,21 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
+ if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
+ return nil, err
+ }
return s, nil
}
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
+ headeredJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+ _, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
+ return err
+}
+
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 4ac0be4e..9d239d23 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -49,6 +49,7 @@ type Events interface {
// SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
+ UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
}
// Topology keeps track of the depths and stream positions for all events.
diff --git a/sytest-whitelist b/sytest-whitelist
index 30380af0..0628ea26 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -160,14 +160,6 @@ User can create and send/receive messages in a room with version 1
POST /createRoom ignores attempts to set the room version via creation_content
Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms
-# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
-# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
-# test for now.
-#An event which redacts itself should be ignored
-# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
-# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
-# test for now.
-#A pair of events which redact each other should be ignored
Full state sync includes joined rooms
A message sent after an initial sync appears in the timeline of an incremental sync.
Can add tag
@@ -295,6 +287,14 @@ POST /rooms/:room_id/redact/:event_id as random user does not redact message
POST /redact disallows redaction of event in different room
An event which redacts itself should be ignored
A pair of events which redact each other should be ignored
+Redaction of a redaction redacts the redaction reason
+An event which redacts an event in a different room should be ignored
+Can receive redactions from regular users over federation in room version 1
+Can receive redactions from regular users over federation in room version 2
+Can receive redactions from regular users over federation in room version 3
+Can receive redactions from regular users over federation in room version 4
+Can receive redactions from regular users over federation in room version 5
+Can receive redactions from regular users over federation in room version 6
Outbound federation can backfill events
Inbound federation can backfill events
Backfill checks the events requested belong to the room