aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-02-10 10:05:14 +0000
committerGitHub <noreply@github.com>2022-02-10 10:05:14 +0000
commit2782ae3d5635de4eaa4b8b4ca5b1a289745ba554 (patch)
tree83371bab910e166352cd947a70e3ffd6152ba265 /roomserver
parent9130156b131ec5a1714f4a7f6e54f12657d655a1 (diff)
Fix fetching missing state (#2163)
* Check that we have a populated state snapshot when determining if we closed the gap * Do the same in the query API * Use HasState more opportunistically * Try to avoid falling down the hole of using a trustworthy but empty state snapshot for non-create events * Refactor missing state and make sure that we really solve the problem for the new event * Comments * Review comments * Tweak that check again * Tidy up that create check further * Fix build hopefully * Update sendOutliers to use OrderAuthAndStateEvents * Don't go out of bounds on missingEvents
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/input/input_events.go23
-rw-r--r--roomserver/internal/input/input_missing.go308
-rw-r--r--roomserver/internal/query/query.go2
-rw-r--r--roomserver/storage/shared/room_updater.go6
-rw-r--r--roomserver/storage/shared/storage.go2
-rw-r--r--roomserver/types/types.go4
6 files changed, 223 insertions, 122 deletions
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 774e71dd..873a051c 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -255,13 +255,32 @@ func (r *Inputer) processRoomEvent(
hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
}
- if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
+ if stateSnapshot, err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
+ // Something went wrong with retrieving the missing state, so we can't
+ // really do anything with the event other than reject it at this point.
isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
+ } else if stateSnapshot != nil {
+ // We retrieved some state and we ended up having to call /state_ids for
+ // the new event in question (probably because closing the gap by using
+ // /get_missing_events didn't do what we hoped) so we'll instead overwrite
+ // the state snapshot with the newly resolved state.
+ missingPrev = false
+ input.HasState = true
+ input.StateEventIDs = make([]string, 0, len(stateSnapshot.StateEvents))
+ for _, e := range stateSnapshot.StateEvents {
+ input.StateEventIDs = append(input.StateEventIDs, e.EventID())
+ }
} else {
+ // We retrieved some state and it would appear that rolling forward the
+ // state did everything we needed it to do, so we can just resolve the
+ // state for the event in the normal way.
missingPrev = false
}
} else {
+ // We're missing prev events or state for the event, but for some reason
+ // we don't know any servers to ask. In this case we can't do anything but
+ // reject the event and hope that it gets unrejected later.
isRejected = true
rejectionErr = fmt.Errorf("missing prev events and no other servers to ask")
}
@@ -299,7 +318,7 @@ func (r *Inputer) processRoomEvent(
return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
}
- if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
+ if input.HasState || (!missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0) {
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go
index 497c049d..19771d4b 100644
--- a/roomserver/internal/input/input_missing.go
+++ b/roomserver/internal/input/input_missing.go
@@ -40,9 +40,10 @@ type missingStateReq struct {
// processEventWithMissingState is the entrypoint for a missingStateReq
// request, as called from processRoomEvent.
+// nolint:gocyclo
func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
-) error {
+) (*parsedRespState, error) {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@@ -68,15 +69,15 @@ func (t *missingStateReq) processEventWithMissingState(
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
- newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion)
+ newEvents, isGapFilled, prevStatesKnown, err := t.getMissingEvents(ctx, e, roomVersion)
if err != nil {
- return fmt.Errorf("t.getMissingEvents: %w", err)
+ return nil, fmt.Errorf("t.getMissingEvents: %w", err)
}
if len(newEvents) == 0 {
- return fmt.Errorf("expected to find missing events but didn't")
+ return nil, fmt.Errorf("expected to find missing events but didn't")
}
if isGapFilled {
- logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents))
+ logger.Infof("Gap filled by /get_missing_events, injecting %d new events", len(newEvents))
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
// in the gap in the DAG
for _, newEvent := range newEvents {
@@ -88,16 +89,141 @@ func (t *missingStateReq) processEventWithMissingState(
})
if err != nil {
if _, ok := err.(types.RejectedError); !ok {
- return fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err)
+ return nil, fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err)
}
}
}
- return nil
}
+ // If we filled the gap *and* we know the state before the prev events
+ // then there's nothing else to do, we have everything we need to deal
+ // with the new event.
+ if isGapFilled && prevStatesKnown {
+ logger.Infof("Gap filled and state found for all prev events")
+ return nil, nil
+ }
+
+ // Otherwise, if we've reached this point, it's possible that we've
+ // either not closed the gap, or we did but we still don't seem to
+ // know the events before the new event. Start by looking up the
+ // state at the event at the back of the gap and we'll try to roll
+ // forward the state first.
backwardsExtremity := newEvents[0]
newEvents = newEvents[1:]
+ resolvedState, err := t.lookupResolvedStateBeforeEvent(ctx, backwardsExtremity, roomVersion)
+ if err != nil {
+ return nil, fmt.Errorf("t.lookupState (backwards extremity): %w", err)
+ }
+
+ hadEvents := map[string]bool{}
+ t.hadEventsMutex.Lock()
+ for k, v := range t.hadEvents {
+ hadEvents[k] = v
+ }
+ t.hadEventsMutex.Unlock()
+
+ sendOutliers := func(resolvedState *parsedRespState) error {
+ outliers, oerr := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion)
+ if oerr != nil {
+ return fmt.Errorf("gomatrixserverlib.OrderAuthAndStateEvents: %w", oerr)
+ }
+ var outlierRoomEvents []api.InputRoomEvent
+ for _, outlier := range outliers {
+ if hadEvents[outlier.EventID()] {
+ continue
+ }
+ outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{
+ Kind: api.KindOutlier,
+ Event: outlier.Headered(roomVersion),
+ Origin: t.origin,
+ })
+ }
+ for _, ire := range outlierRoomEvents {
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &ire)
+ if err != nil {
+ if _, ok := err.(types.RejectedError); !ok {
+ return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
+ }
+ }
+ }
+ return nil
+ }
+
+ // Send outliers first so we can send the state along with the new backwards
+ // extremity without any missing auth events.
+ if err = sendOutliers(resolvedState); err != nil {
+ return nil, fmt.Errorf("sendOutliers: %w", err)
+ }
+
+ // Now send the backward extremity into the roomserver with the
+ // newly resolved state. This marks the "oldest" point in the backfill and
+ // sets the baseline state for any new events after this.
+ stateIDs := make([]string, 0, len(resolvedState.StateEvents))
+ for _, event := range resolvedState.StateEvents {
+ stateIDs = append(stateIDs, event.EventID())
+ }
+
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
+ Kind: api.KindOld,
+ Event: backwardsExtremity.Headered(roomVersion),
+ Origin: t.origin,
+ HasState: true,
+ StateEventIDs: stateIDs,
+ SendAsServer: api.DoNotSendToOtherServers,
+ })
+ if err != nil {
+ if _, ok := err.(types.RejectedError); !ok {
+ return nil, fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err)
+ }
+ }
+
+ // Then send all of the newer backfilled events, of which will all be newer
+ // than the backward extremity, into the roomserver without state. This way
+ // they will automatically fast-forward based on the room state at the
+ // extremity in the last step.
+ for _, newEvent := range newEvents {
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
+ Kind: api.KindOld,
+ Event: newEvent.Headered(roomVersion),
+ Origin: t.origin,
+ SendAsServer: api.DoNotSendToOtherServers,
+ })
+ if err != nil {
+ if _, ok := err.(types.RejectedError); !ok {
+ return nil, fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err)
+ }
+ }
+ }
+
+ // Finally, check again if we know everything we need to know in order to
+ // make forward progress. If the prev state is known then we consider the
+ // rolled forward state to be sufficient — we now know all of the state
+ // before the prev events. If we don't then we need to look up the state
+ // before the new event as well, otherwise we will never make any progress.
+ if t.isPrevStateKnown(ctx, e) {
+ return nil, nil
+ }
+
+ // If we still haven't got the state for the prev events then we'll go and
+ // ask the federation for it if needed.
+ resolvedState, err = t.lookupResolvedStateBeforeEvent(ctx, e, roomVersion)
+ if err != nil {
+ return nil, fmt.Errorf("t.lookupState (new event): %w", err)
+ }
+
+ // Send the outliers for the retrieved state.
+ if err = sendOutliers(resolvedState); err != nil {
+ return nil, fmt.Errorf("sendOutliers: %w", err)
+ }
+
+ // Then return the resolved state, for which the caller can replace the
+ // HasState with the event IDs to create a new state snapshot when we
+ // process the new event.
+ return resolvedState, nil
+}
+
+func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
@@ -110,14 +236,13 @@ func (t *missingStateReq) processEventWithMissingState(
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*respState
- for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
+ for _, prevEventID := range e.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
- prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
- if lerr != nil {
- logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
- return lerr
+ prevState, trustworthy, err := t.lookupStateAfterEvent(ctx, roomVersion, e.RoomID(), prevEventID)
+ if err != nil {
+ return nil, fmt.Errorf("t.lookupStateAfterEvent: %w", err)
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
@@ -133,18 +258,20 @@ func (t *missingStateReq) processEventWithMissingState(
resolvedState := &parsedRespState{}
switch len(states) {
case 0:
- extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
+ extremityIsCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
- logger.Errorf("Failed to lookup any state after prev_events")
- return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
+ return nil, fmt.Errorf("expected %d states but got %d", len(e.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
- // use it as-is. There's no point in resolving it again.
- if states[0].trustworthy {
+ // use it as-is. There's no point in resolving it again. Only trust a
+ // trustworthy state snapshot if it actually contains some state for all
+ // non-create events, otherwise we need to resolve what came from federation.
+ isCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
+ if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) {
resolvedState = states[0].parsedRespState
break
}
@@ -157,89 +284,16 @@ func (t *missingStateReq) processEventWithMissingState(
respStates[i] = states[i].parsedRespState
}
// There's more than one previous state - run them all through state res
+ var err error
t.roomsMu.Lock(e.RoomID())
- resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
+ resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, e)
t.roomsMu.Unlock(e.RoomID())
if err != nil {
- logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
- return err
- }
- }
-
- hadEvents := map[string]bool{}
- t.hadEventsMutex.Lock()
- for k, v := range t.hadEvents {
- hadEvents[k] = v
- }
- t.hadEventsMutex.Unlock()
-
- // Send outliers first so we can send the new backwards extremity without causing errors
- outliers, err := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion)
- if err != nil {
- return err
- }
- var outlierRoomEvents []api.InputRoomEvent
- for _, outlier := range outliers {
- if hadEvents[outlier.EventID()] {
- continue
- }
- outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{
- Kind: api.KindOutlier,
- Event: outlier.Headered(roomVersion),
- Origin: t.origin,
- })
- }
- // TODO: we could do this concurrently?
- for _, ire := range outlierRoomEvents {
- _, err = t.inputer.processRoomEvent(ctx, t.db, &ire)
- if err != nil {
- if _, ok := err.(types.RejectedError); !ok {
- return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
- }
- }
- }
-
- // Now send the backward extremity into the roomserver with the
- // newly resolved state. This marks the "oldest" point in the backfill and
- // sets the baseline state for any new events after this.
- stateIDs := make([]string, 0, len(resolvedState.StateEvents))
- for _, event := range resolvedState.StateEvents {
- stateIDs = append(stateIDs, event.EventID())
- }
-
- _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
- Kind: api.KindOld,
- Event: backwardsExtremity.Headered(roomVersion),
- Origin: t.origin,
- HasState: true,
- StateEventIDs: stateIDs,
- SendAsServer: api.DoNotSendToOtherServers,
- })
- if err != nil {
- if _, ok := err.(types.RejectedError); !ok {
- return fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err)
- }
- }
-
- // Then send all of the newer backfilled events, of which will all be newer
- // than the backward extremity, into the roomserver without state. This way
- // they will automatically fast-forward based on the room state at the
- // extremity in the last step.
- for _, newEvent := range newEvents {
- _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
- Kind: api.KindOld,
- Event: newEvent.Headered(roomVersion),
- Origin: t.origin,
- SendAsServer: api.DoNotSendToOtherServers,
- })
- if err != nil {
- if _, ok := err.(types.RejectedError); !ok {
- return fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err)
- }
+ return nil, fmt.Errorf("t.resolveStatesAndCheck: %w", err)
}
}
- return nil
+ return resolvedState, nil
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
@@ -408,7 +462,7 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
-func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) {
+func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
latest := t.db.LatestEvents()
@@ -435,7 +489,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
if errors.Is(err, context.DeadlineExceeded) {
select {
case <-ctx.Done(): // the parent request context timed out
- return nil, false, context.DeadlineExceeded
+ return nil, false, false, context.DeadlineExceeded
default: // this request exceed its own timeout
continue
}
@@ -448,7 +502,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin, len(t.servers),
)
- return nil, false, missingPrevEventsError{
+ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
@@ -457,17 +511,9 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
// Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed.
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
- missingEvents := make([]*gomatrixserverlib.Event, len(missingResp.Events))
- for i, evJSON := range missingResp.Events {
- ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(evJSON, roomVersion)
- if err != nil {
- logger.WithError(err).WithField("event", string(evJSON)).Warn("NewEventFromUntrustedJSON: failed")
- return nil, false, missingPrevEventsError{
- eventID: e.EventID(),
- err: err,
- }
- }
- missingEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
+ missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
+ for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
+ missingEvents = append(missingEvents, t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap())
}
// topologically sort and sanity check that we are making forward progress
@@ -489,27 +535,51 @@ Event:
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin,
)
- return nil, false, missingPrevEventsError{
+ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
if len(newEvents) == 0 {
- return nil, false, nil // TODO: error instead?
+ return nil, false, false, nil // TODO: error instead?
}
- // now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event
earliestNewEvent := newEvents[0]
- if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 {
- if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
- // we got to the beginning of the room so there will be no state! It's all good we can process this
- return newEvents, true, nil
+
+ // If we retrieved back to the beginning of the room then there's nothing else
+ // to do - we closed the gap.
+ if len(earliestNewEvent.PrevEventIDs()) == 0 && earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
+ return newEvents, true, t.isPrevStateKnown(ctx, e), nil
+ }
+
+ // If our backward extremity was not a known event to us then we obviously didn't
+ // close the gap.
+ if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 && state[0].BeforeStateSnapshotNID == 0 {
+ return newEvents, false, false, nil
+ }
+
+ // At this point we are satisfied that we know the state both at the earliest
+ // retrieved event and at the prev events of the new event.
+ return newEvents, true, t.isPrevStateKnown(ctx, e), nil
+}
+
+func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserverlib.Event) bool {
+ expected := len(e.PrevEventIDs())
+ state, err := t.db.StateAtEventIDs(ctx, e.PrevEventIDs())
+ if err != nil || len(state) != expected {
+ // We didn't get as many state snapshots as we expected, or there was an error,
+ // so we haven't completely solved the problem for the new event.
+ return false
+ }
+ // Check to see if we have a populated state snapshot for all of the prev events.
+ for _, stateAtEvent := range state {
+ if stateAtEvent.BeforeStateSnapshotNID == 0 {
+ // One of the prev events still has unknown state, so we haven't really
+ // solved the problem.
+ return false
}
- // we don't have the state at this earliest event from /g_m_e so we won't have state for later events either
- return newEvents, false, nil
}
- // StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap!
- return newEvents, true, nil
+ return true
}
func (t *missingStateReq) lookupMissingStateViaState(
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 84553303..05cd686f 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -150,7 +150,7 @@ func (r *Queryer) QueryMissingAuthPrevEvents(
for _, prevEventID := range request.PrevEventIDs {
state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID})
- if err != nil || len(state) == 0 {
+ if err != nil || len(state) == 0 || (!state[0].IsCreate() && state[0].BeforeStateSnapshotNID == 0) {
response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID)
}
}
diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go
index bb9f5dc6..fc75a260 100644
--- a/roomserver/storage/shared/room_updater.go
+++ b/roomserver/storage/shared/room_updater.go
@@ -187,6 +187,12 @@ func (u *RoomUpdater) EventIDs(
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
}
+func (u *RoomUpdater) EventNIDs(
+ ctx context.Context, eventIDs []string,
+) (map[string]types.EventNID, error) {
+ return u.d.eventNIDs(ctx, u.txn, eventIDs)
+}
+
func (u *RoomUpdater) StateAtEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) {
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 127cd1f5..8319de26 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -603,6 +603,8 @@ func (d *Database) storeEvent(
if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID
eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID())
+ } else if err != nil {
+ return fmt.Errorf("d.EventsTable.InsertEvent: %w", err)
}
if err != nil {
return fmt.Errorf("d.EventsTable.SelectEvent: %w", err)
diff --git a/roomserver/types/types.go b/roomserver/types/types.go
index 5e1eebe9..5d52ccfc 100644
--- a/roomserver/types/types.go
+++ b/roomserver/types/types.go
@@ -83,6 +83,10 @@ type StateKeyTuple struct {
EventStateKeyNID EventStateKeyNID
}
+func (a StateKeyTuple) IsCreate() bool {
+ return a.EventTypeNID == MRoomCreateNID && a.EventStateKeyNID == EmptyStateKeyNID
+}
+
// LessThan returns true if this state key is less than the other state key.
// The ordering is arbitrary and is used to implement binary search and to efficiently deduplicate entries.
func (a StateKeyTuple) LessThan(b StateKeyTuple) bool {