aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-09-16 13:00:52 +0100
committerGitHub <noreply@github.com>2020-09-16 13:00:52 +0100
commit18231f25b437d2f03b3be1e0536fc46d45c8691f (patch)
tree5a3ef66c25268df8214be69f0c9e1f5f925da14f /roomserver
parentba6c7c4a5c4166b7085343886ab69ef331238ff4 (diff)
Implement rejected events (#1426)
* WIP Event rejection * Still send back errors for rejected events Instead, discard them at the federationapi /send layer rather than re-implementing checks at the clientapi/PerformJoin layer. * Implement rejected events Critically, rejected events CAN cause state resolution to happen as it can merge forks in the DAG. This is fine, _provided_ we do not add the rejected event when performing state resolution, which is what this PR does. It also fixes the error handling when NotAllowed happens, as we were checking too early and needlessly handling NotAllowed in more than one place. * Update test to match reality * Modify InputRoomEvents to no longer return an error Errors do not serialise across HTTP boundaries in polylith mode, so instead set fields on the InputRoomEventsResponse. Add `Err()` function to make the API shape basically the same. * Remove redundant returns; linting * Update blacklist
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/api.go2
-rw-r--r--roomserver/api/api_trace.go7
-rw-r--r--roomserver/api/input.go16
-rw-r--r--roomserver/api/wrapper.go3
-rw-r--r--roomserver/internal/alias.go3
-rw-r--r--roomserver/internal/input/input.go8
-rw-r--r--roomserver/internal/input/input_events.go31
-rw-r--r--roomserver/internal/perform/perform_backfill.go2
-rw-r--r--roomserver/internal/perform/perform_invite.go3
-rw-r--r--roomserver/internal/perform/perform_join.go3
-rw-r--r--roomserver/internal/perform/perform_leave.go3
-rw-r--r--roomserver/internal/query/query.go1
-rw-r--r--roomserver/inthttp/client.go7
-rw-r--r--roomserver/inthttp/server.go4
-rw-r--r--roomserver/roomserver_test.go8
-rw-r--r--roomserver/state/state.go5
-rw-r--r--roomserver/storage/interface.go1
-rw-r--r--roomserver/storage/postgres/events_table.go12
-rw-r--r--roomserver/storage/shared/storage.go7
-rw-r--r--roomserver/storage/sqlite3/events_table.go13
-rw-r--r--roomserver/storage/tables/interface.go5
-rw-r--r--roomserver/types/types.go3
22 files changed, 97 insertions, 50 deletions
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index eecefe32..2495157a 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -16,7 +16,7 @@ type RoomserverInternalAPI interface {
ctx context.Context,
request *InputRoomEventsRequest,
response *InputRoomEventsResponse,
- ) error
+ )
PerformInvite(
ctx context.Context,
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
index 64330930..b7accb9a 100644
--- a/roomserver/api/api_trace.go
+++ b/roomserver/api/api_trace.go
@@ -23,10 +23,9 @@ func (t *RoomserverInternalAPITrace) InputRoomEvents(
ctx context.Context,
req *InputRoomEventsRequest,
res *InputRoomEventsResponse,
-) error {
- err := t.Impl.InputRoomEvents(ctx, req, res)
- util.GetLogger(ctx).WithError(err).Infof("InputRoomEvents req=%+v res=%+v", js(req), js(res))
- return err
+) {
+ t.Impl.InputRoomEvents(ctx, req, res)
+ util.GetLogger(ctx).Infof("InputRoomEvents req=%+v res=%+v", js(req), js(res))
}
func (t *RoomserverInternalAPITrace) PerformInvite(
diff --git a/roomserver/api/input.go b/roomserver/api/input.go
index 651c0e9f..862a6fa1 100644
--- a/roomserver/api/input.go
+++ b/roomserver/api/input.go
@@ -16,6 +16,8 @@
package api
import (
+ "fmt"
+
"github.com/matrix-org/gomatrixserverlib"
)
@@ -87,4 +89,18 @@ type InputRoomEventsRequest struct {
// InputRoomEventsResponse is a response to InputRoomEvents
type InputRoomEventsResponse struct {
+ ErrMsg string // set if there was any error
+ NotAllowed bool // true if an event in the input was not allowed.
+}
+
+func (r *InputRoomEventsResponse) Err() error {
+ if r.ErrMsg == "" {
+ return nil
+ }
+ if r.NotAllowed {
+ return &gomatrixserverlib.NotAllowed{
+ Message: r.ErrMsg,
+ }
+ }
+ return fmt.Errorf("InputRoomEventsResponse: %s", r.ErrMsg)
}
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index e5339311..cc048ddd 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -187,7 +187,8 @@ func SendInputRoomEvents(
) error {
request := InputRoomEventsRequest{InputRoomEvents: ires}
var response InputRoomEventsResponse
- return rsAPI.InputRoomEvents(ctx, &request, &response)
+ rsAPI.InputRoomEvents(ctx, &request, &response)
+ return response.Err()
}
// SendInvite event to the roomserver.
diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go
index d576a817..3e023d2a 100644
--- a/roomserver/internal/alias.go
+++ b/roomserver/internal/alias.go
@@ -271,5 +271,6 @@ func (r *RoomserverInternalAPI) sendUpdatedAliasesEvent(
var inputRes api.InputRoomEventsResponse
// Send the request
- return r.InputRoomEvents(ctx, &inputReq, &inputRes)
+ r.InputRoomEvents(ctx, &inputReq, &inputRes)
+ return inputRes.Err()
}
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 51d20ad3..d340ac21 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -110,7 +110,7 @@ func (r *Inputer) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
-) error {
+) {
// Create a wait group. Each task that we dispatch will call Done on
// this wait group so that we know when all of our events have been
// processed.
@@ -156,8 +156,10 @@ func (r *Inputer) InputRoomEvents(
// that back to the caller.
for _, task := range tasks {
if task.err != nil {
- return task.err
+ response.ErrMsg = task.err.Error()
+ _, rejected := task.err.(*gomatrixserverlib.NotAllowed)
+ response.NotAllowed = rejected
+ return
}
}
- return nil
}
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index daf1afcd..0558cd76 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -46,10 +46,11 @@ func (r *Inputer) processRoomEvent(
// Check that the event passes authentication checks and work out
// the numeric IDs for the auth events.
- authEventNIDs, err := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
- if err != nil {
- logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
- return
+ isRejected := false
+ authEventNIDs, rejectionErr := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
+ if rejectionErr != nil {
+ logrus.WithError(rejectionErr).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event, rejecting event")
+ isRejected = true
}
// If we don't have a transaction ID then get one.
@@ -65,12 +66,13 @@ func (r *Inputer) processRoomEvent(
}
// Store the event.
- _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
+ _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs, isRejected)
if err != nil {
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
}
+
// if storing this event results in it being redacted then do so.
- if redactedEventID == event.EventID() {
+ if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, &event)
if rerr != nil {
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr)
@@ -101,12 +103,22 @@ func (r *Inputer) processRoomEvent(
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
- err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event)
+ err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil {
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
+ // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
+ if isRejected {
+ logrus.WithFields(logrus.Fields{
+ "event_id": event.EventID(),
+ "type": event.Type(),
+ "room": event.RoomID(),
+ }).Debug("Stored rejected event")
+ return event.EventID(), rejectionErr
+ }
+
if input.Kind == api.KindRewrite {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
@@ -157,11 +169,12 @@ func (r *Inputer) calculateAndSetState(
roomInfo types.RoomInfo,
stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event,
+ isRejected bool,
) error {
var err error
roomState := state.NewStateResolution(r.DB, roomInfo)
- if input.HasState {
+ if input.HasState && !isRejected {
// Check here if we think we're in the room already.
stateAtEvent.Overwrite = true
var joinEventNIDs []types.EventNID
@@ -188,7 +201,7 @@ func (r *Inputer) calculateAndSetState(
stateAtEvent.Overwrite = false
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
- if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event); err != nil {
+ if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, isRejected); err != nil {
return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err)
}
}
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
index 668c8078..eb1aa99b 100644
--- a/roomserver/internal/perform/perform_backfill.go
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -535,7 +535,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse
var stateAtEvent types.StateAtEvent
var redactedEventID string
var redactionEvent *gomatrixserverlib.Event
- roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
+ roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids, false)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
continue
diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go
index e06ad062..d6a64e7e 100644
--- a/roomserver/internal/perform/perform_invite.go
+++ b/roomserver/internal/perform/perform_invite.go
@@ -183,7 +183,8 @@ func (r *Inviter) PerformInvite(
},
}
inputRes := &api.InputRoomEventsResponse{}
- if err = r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
+ r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes)
+ if err = inputRes.Err(); err != nil {
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
}
} else {
diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go
index f76806c7..e9aebb83 100644
--- a/roomserver/internal/perform/perform_join.go
+++ b/roomserver/internal/perform/perform_join.go
@@ -247,7 +247,8 @@ func (r *Joiner) performJoinRoomByID(
},
}
inputRes := api.InputRoomEventsResponse{}
- if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes)
+ if err = inputRes.Err(); err != nil {
var notAllowed *gomatrixserverlib.NotAllowed
if errors.As(err, &notAllowed) {
return "", &api.PerformError{
diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go
index aaa3b5b1..6aaf1bf3 100644
--- a/roomserver/internal/perform/perform_leave.go
+++ b/roomserver/internal/perform/perform_leave.go
@@ -139,7 +139,8 @@ func (r *Leaver) performLeaveRoomByID(
},
}
inputRes := api.InputRoomEventsResponse{}
- if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes)
+ if err = inputRes.Err(); err != nil {
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
}
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index b34ae770..fb981447 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -70,6 +70,7 @@ func (r *Queryer) QueryStateAfterEvents(
if err != nil {
switch err.(type) {
case types.MissingEventError:
+ util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err)
return nil
default:
return err
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index 1ff1fc82..f2510c75 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -149,12 +149,15 @@ func (h *httpRoomserverInternalAPI) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
-) error {
+) {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputRoomEvents")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverInputRoomEventsPath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+ if err != nil {
+ response.ErrMsg = err.Error()
+ }
}
func (h *httpRoomserverInternalAPI) PerformInvite(
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index 5816d4d8..8ffa9cf9 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -20,9 +20,7 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
- if err := r.InputRoomEvents(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
+ r.InputRoomEvents(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index ef590100..912c5852 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -140,14 +140,6 @@ func mustCreateEvents(t *testing.T, roomVer gomatrixserverlib.RoomVersion, event
return
}
-func eventsJSON(events []gomatrixserverlib.Event) []json.RawMessage {
- result := make([]json.RawMessage, len(events))
- for i := range events {
- result[i] = events[i].JSON()
- }
- return result
-}
-
func mustLoadRawEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) []gomatrixserverlib.HeaderedEvent {
t.Helper()
hs := make([]gomatrixserverlib.HeaderedEvent, len(events))
diff --git a/roomserver/state/state.go b/roomserver/state/state.go
index 37e6807a..9ee6f40d 100644
--- a/roomserver/state/state.go
+++ b/roomserver/state/state.go
@@ -159,7 +159,7 @@ func (v StateResolution) LoadCombinedStateAfterEvents(
}
fullState = append(fullState, entries...)
}
- if prevState.IsStateEvent() {
+ if prevState.IsStateEvent() && !prevState.IsRejected {
// If the prev event was a state event then add an entry for the event itself
// so that we get the state after the event rather than the state before.
fullState = append(fullState, prevState.StateEntry)
@@ -523,6 +523,7 @@ func init() {
func (v StateResolution) CalculateAndStoreStateBeforeEvent(
ctx context.Context,
event gomatrixserverlib.Event,
+ isRejected bool,
) (types.StateSnapshotNID, error) {
// Load the state at the prev events.
prevEventRefs := event.PrevEvents()
@@ -561,7 +562,7 @@ func (v StateResolution) CalculateAndStoreStateAfterEvents(
if len(prevStates) == 1 {
prevState := prevStates[0]
- if prevState.EventStateKeyNID == 0 {
+ if prevState.EventStateKeyNID == 0 || prevState.IsRejected {
// 3) None of the previous events were state events and they all
// have the same state, so this event has exactly the same state
// as the previous events.
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index be724da6..10a380e8 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -70,6 +70,7 @@ type Database interface {
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
StoreEvent(
ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
+ isRejected bool,
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go
index e66efb09..c8eb8e2d 100644
--- a/roomserver/storage/postgres/events_table.go
+++ b/roomserver/storage/postgres/events_table.go
@@ -65,13 +65,14 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
-- Needed for setting reference hashes when sending new events.
reference_sha256 BYTEA NOT NULL,
-- A list of numeric IDs for events that can authenticate this event.
- auth_event_nids BIGINT[] NOT NULL
+ auth_event_nids BIGINT[] NOT NULL,
+ is_rejected BOOLEAN NOT NULL DEFAULT FALSE
);
`
const insertEventSQL = "" +
- "INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" +
- " VALUES ($1, $2, $3, $4, $5, $6, $7)" +
+ "INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" +
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" +
" DO NOTHING" +
" RETURNING event_nid, state_snapshot_nid"
@@ -88,7 +89,7 @@ const bulkSelectStateEventByIDSQL = "" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
- "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
+ "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id = ANY($1)"
const updateEventStateSQL = "" +
@@ -174,12 +175,14 @@ func (s *eventStatements) InsertEvent(
referenceSHA256 []byte,
authEventNIDs []types.EventNID,
depth int64,
+ isRejected bool,
) (types.EventNID, types.StateSnapshotNID, error) {
var eventNID int64
var stateNID int64
err := s.insertEventStmt.QueryRowContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth,
+ isRejected,
).Scan(&eventNID, &stateNID)
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
@@ -255,6 +258,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
&result.EventStateKeyNID,
&result.EventNID,
&result.BeforeStateSnapshotNID,
+ &result.IsRejected,
); err != nil {
return nil, err
}
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 262b0f2f..e710b99b 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -382,7 +382,7 @@ func (d *Database) GetLatestEventsForUpdate(
// nolint:gocyclo
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event,
- txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
+ txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool,
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
var (
roomNID types.RoomNID
@@ -446,6 +446,7 @@ func (d *Database) StoreEvent(
event.EventReference().EventSHA256,
authEventNIDs,
event.Depth(),
+ isRejected,
); err != nil {
if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID
@@ -459,7 +460,9 @@ func (d *Database) StoreEvent(
if err = d.EventJSONTable.InsertEventJSON(ctx, txn, eventNID, event.JSON()); err != nil {
return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
}
- redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
+ if !isRejected { // ignore rejected redaction events
+ redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
+ }
return nil
})
if err != nil {
diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go
index a866c85d..773e9ade 100644
--- a/roomserver/storage/sqlite3/events_table.go
+++ b/roomserver/storage/sqlite3/events_table.go
@@ -41,13 +41,14 @@ const eventsSchema = `
depth INTEGER NOT NULL,
event_id TEXT NOT NULL UNIQUE,
reference_sha256 BLOB NOT NULL,
- auth_event_nids TEXT NOT NULL DEFAULT '[]'
+ auth_event_nids TEXT NOT NULL DEFAULT '[]',
+ is_rejected BOOLEAN NOT NULL DEFAULT FALSE
);
`
const insertEventSQL = `
- INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)
- VALUES ($1, $2, $3, $4, $5, $6, $7)
+ INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING;
`
@@ -63,7 +64,7 @@ const bulkSelectStateEventByIDSQL = "" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
- "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
+ "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id IN ($1)"
const updateEventStateSQL = "" +
@@ -150,13 +151,14 @@ func (s *eventStatements) InsertEvent(
referenceSHA256 []byte,
authEventNIDs []types.EventNID,
depth int64,
+ isRejected bool,
) (types.EventNID, types.StateSnapshotNID, error) {
// attempt to insert: the last_row_id is the event NID
var eventNID int64
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
result, err := insertStmt.ExecContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
- eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth,
+ eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
)
if err != nil {
return 0, 0, err
@@ -261,6 +263,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
&result.EventStateKeyNID,
&result.EventNID,
&result.BeforeStateSnapshotNID,
+ &result.IsRejected,
); err != nil {
return nil, err
}
diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go
index adb06212..eba878ba 100644
--- a/roomserver/storage/tables/interface.go
+++ b/roomserver/storage/tables/interface.go
@@ -34,7 +34,10 @@ type EventStateKeys interface {
}
type Events interface {
- InsertEvent(c context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string, referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64) (types.EventNID, types.StateSnapshotNID, error)
+ InsertEvent(
+ ctx context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string,
+ referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool,
+ ) (types.EventNID, types.StateSnapshotNID, error)
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
diff --git a/roomserver/types/types.go b/roomserver/types/types.go
index f5b45763..c0fcef65 100644
--- a/roomserver/types/types.go
+++ b/roomserver/types/types.go
@@ -101,6 +101,9 @@ type StateAtEvent struct {
Overwrite bool
// The state before the event.
BeforeStateSnapshotNID StateSnapshotNID
+ // True if this StateEntry is rejected. State resolution should then treat this
+ // StateEntry as being a message event (not a state event).
+ IsRejected bool
// The state entry for the event itself, allows us to calculate the state after the event.
StateEntry
}