aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-09-30 12:48:10 +0100
committerGitHub <noreply@github.com>2022-09-30 12:48:10 +0100
commit6348486a1365c7469a498101f5035a9b6bd16d22 (patch)
treed8a5ba572c5fc4fdec383802de5fac3a5e13c24d /syncapi/routing
parent8a82f100460dc5ca7bd39ae2345c251d6622c494 (diff)
Transactional isolation for `/sync` (#2745)
This should transactional snapshot isolation for `/sync` etc requests. For now we don't use repeatable read due to some odd test failures with invites.
Diffstat (limited to 'syncapi/routing')
-rw-r--r--syncapi/routing/context.go30
-rw-r--r--syncapi/routing/messages.go42
-rw-r--r--syncapi/routing/search.go24
3 files changed, 61 insertions, 35 deletions
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index 1ebdfe60..1ce34b85 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -51,6 +51,12 @@ func Context(
roomID, eventID string,
lazyLoadCache caching.LazyLoadCache,
) util.JSONResponse {
+ snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ defer snapshot.Rollback() // nolint:errcheck
+
filter, err := parseRoomEventFilter(req)
if err != nil {
errMsg := ""
@@ -97,7 +103,7 @@ func Context(
ContainsURL: filter.ContainsURL,
}
- id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID)
+ id, requestedEvent, err := snapshot.SelectContextEvent(ctx, roomID, eventID)
if err != nil {
if err == sql.ErrNoRows {
return util.JSONResponse{
@@ -111,7 +117,7 @@ func Context(
// verify the user is allowed to see the context for this room/event
startTime := time.Now()
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError()
@@ -127,20 +133,20 @@ func Context(
}
}
- eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter)
+ eventsBefore, err := snapshot.SelectContextBeforeEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch before events")
return jsonerror.InternalServerError()
}
- _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter)
+ _, eventsAfter, err := snapshot.SelectContextAfterEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch after events")
return jsonerror.InternalServerError()
}
startTime = time.Now()
- eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID)
+ eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, snapshot, rsAPI, eventsBefore, eventsAfter, device.UserID)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError()
@@ -152,7 +158,7 @@ func Context(
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
- state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
+ state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil)
if err != nil {
logrus.WithError(err).Error("unable to fetch current room state")
return jsonerror.InternalServerError()
@@ -173,7 +179,7 @@ func Context(
if len(response.State) > filter.Limit {
response.State = response.State[len(response.State)-filter.Limit:]
}
- start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
+ start, end, err := getStartEnd(ctx, snapshot, eventsBefore, eventsAfter)
if err == nil {
response.End = end.String()
response.Start = start.String()
@@ -188,7 +194,7 @@ func Context(
// by combining the events before and after the context event. Returns the filtered events,
// and an error, if any.
func applyHistoryVisibilityOnContextEvents(
- ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI,
+ ctx context.Context, snapshot storage.DatabaseTransaction, rsAPI roomserver.SyncRoomserverAPI,
eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent,
userID string,
) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) {
@@ -205,7 +211,7 @@ func applyHistoryVisibilityOnContextEvents(
}
allEvents := append(eventsBefore, eventsAfter...)
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context")
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, allEvents, nil, userID, "context")
if err != nil {
return nil, nil, err
}
@@ -222,15 +228,15 @@ func applyHistoryVisibilityOnContextEvents(
return filteredBefore, filteredAfter, nil
}
-func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
+func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
if len(startEvents) > 0 {
- start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())
+ start, err = snapshot.EventPositionInTopology(ctx, startEvents[0].EventID())
if err != nil {
return
}
}
if len(endEvents) > 0 {
- end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID())
+ end, err = snapshot.EventPositionInTopology(ctx, endEvents[0].EventID())
}
return
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 03614302..8f3ed3f5 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
@@ -39,6 +40,7 @@ import (
type messagesReq struct {
ctx context.Context
db storage.Database
+ snapshot storage.DatabaseTransaction
rsAPI api.SyncRoomserverAPI
cfg *config.SyncAPI
roomID string
@@ -70,6 +72,16 @@ func OnIncomingMessagesRequest(
) util.JSONResponse {
var err error
+ // NewDatabaseTransaction is used here instead of NewDatabaseSnapshot as we
+ // expect to be able to write to the database in response to a /messages
+ // request that requires backfilling from the roomserver or federation.
+ snapshot, err := db.NewDatabaseTransaction(req.Context())
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ var succeeded bool
+ defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
+
// check if the user has already forgotten about this room
isForgotten, roomExists, err := checkIsRoomForgotten(req.Context(), roomID, device.UserID, rsAPI)
if err != nil {
@@ -132,7 +144,7 @@ func OnIncomingMessagesRequest(
}
} else {
fromStream = &streamToken
- from, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering)
+ from, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
@@ -154,7 +166,7 @@ func OnIncomingMessagesRequest(
JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()),
}
} else {
- to, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering)
+ to, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
@@ -165,7 +177,7 @@ func OnIncomingMessagesRequest(
// If "to" isn't provided, it defaults to either the earliest stream
// position (if we're going backward) or to the latest one (if we're
// going forward).
- to, err = setToDefault(req.Context(), db, backwardOrdering, roomID)
+ to, err = setToDefault(req.Context(), snapshot, backwardOrdering, roomID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("setToDefault failed")
return jsonerror.InternalServerError()
@@ -186,6 +198,7 @@ func OnIncomingMessagesRequest(
mReq := messagesReq{
ctx: req.Context(),
db: db,
+ snapshot: snapshot,
rsAPI: rsAPI,
cfg: cfg,
roomID: roomID,
@@ -217,7 +230,7 @@ func OnIncomingMessagesRequest(
Start: start.String(),
End: end.String(),
}
- res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
+ res.applyLazyLoadMembers(req.Context(), snapshot, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
// If we didn't return any events, set the end to an empty string, so it will be omitted
// in the response JSON.
@@ -229,6 +242,7 @@ func OnIncomingMessagesRequest(
}
// Respond with the events.
+ succeeded = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: res,
@@ -239,7 +253,7 @@ func OnIncomingMessagesRequest(
// LazyLoadMembers enabled.
func (m *messagesResp) applyLazyLoadMembers(
ctx context.Context,
- db storage.Database,
+ db storage.DatabaseTransaction,
roomID string,
device *userapi.Device,
lazyLoad bool,
@@ -292,7 +306,7 @@ func (r *messagesReq) retrieveEvents() (
end types.TopologyToken, err error,
) {
// Retrieve the events from the local database.
- streamEvents, err := r.db.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
+ streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
if err != nil {
err = fmt.Errorf("GetEventsInRange: %w", err)
return
@@ -348,7 +362,7 @@ func (r *messagesReq) retrieveEvents() (
// Apply room history visibility filter
startTime := time.Now()
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.snapshot, r.rsAPI, events, nil, r.device.UserID, "messages")
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": r.roomID,
@@ -366,7 +380,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
// else to go. This seems to fix Element iOS from looping on /messages endlessly.
end = types.TopologyToken{}
} else {
- end, err = r.db.EventPositionInTopology(
+ end, err = r.snapshot.EventPositionInTopology(
r.ctx, events[0].EventID(),
)
// A stream/topological position is a cursor located between two events.
@@ -378,7 +392,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
}
} else {
start = *r.from
- end, err = r.db.EventPositionInTopology(
+ end, err = r.snapshot.EventPositionInTopology(
r.ctx, events[len(events)-1].EventID(),
)
}
@@ -399,7 +413,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
func (r *messagesReq) handleEmptyEventsSlice() (
events []*gomatrixserverlib.HeaderedEvent, err error,
) {
- backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
+ backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
// Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 {
@@ -443,7 +457,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
}
// Check if the slice contains a backward extremity.
- backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
+ backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
if err != nil {
return
}
@@ -463,7 +477,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
}
// Append the events ve previously retrieved locally.
- events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...)
+ events = append(events, r.snapshot.StreamEventsToEvents(nil, streamEvents)...)
sort.Sort(eventsByDepth(events))
return
@@ -553,7 +567,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault(
- ctx context.Context, db storage.Database, backwardOrdering bool,
+ ctx context.Context, snapshot storage.DatabaseTransaction, backwardOrdering bool,
roomID string,
) (to types.TopologyToken, err error) {
if backwardOrdering {
@@ -561,7 +575,7 @@ func setToDefault(
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
to = types.TopologyToken{}
} else {
- to, err = db.MaxTopologicalPosition(ctx, roomID)
+ to, err = snapshot.MaxTopologicalPosition(ctx, roomID)
}
return
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
index 341efeb1..bac534a2 100644
--- a/syncapi/routing/search.go
+++ b/syncapi/routing/search.go
@@ -61,8 +61,14 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
searchReq.SearchCategories.RoomEvents.Filter.Limit = 5
}
+ snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ defer snapshot.Rollback() // nolint:errcheck
+
// only search rooms the user is actually joined to
- joinedRooms, err := syncDB.RoomIDsWithMembership(ctx, device.UserID, "join")
+ joinedRooms, err := snapshot.RoomIDsWithMembership(ctx, device.UserID, "join")
if err != nil {
return jsonerror.InternalServerError()
}
@@ -161,12 +167,12 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
stateForRooms := make(map[string][]gomatrixserverlib.ClientEvent)
for _, event := range evs {
- eventsBefore, eventsAfter, err := contextEvents(ctx, syncDB, event, roomFilter, searchReq)
+ eventsBefore, eventsAfter, err := contextEvents(ctx, snapshot, event, roomFilter, searchReq)
if err != nil {
logrus.WithError(err).Error("failed to get context events")
return jsonerror.InternalServerError()
}
- startToken, endToken, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
+ startToken, endToken, err := getStartEnd(ctx, snapshot, eventsBefore, eventsAfter)
if err != nil {
logrus.WithError(err).Error("failed to get start/end")
return jsonerror.InternalServerError()
@@ -176,7 +182,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
for _, ev := range append(eventsBefore, eventsAfter...) {
profile, ok := knownUsersProfiles[event.Sender()]
if !ok {
- stateEvent, err := syncDB.GetStateEvent(ctx, ev.RoomID(), gomatrixserverlib.MRoomMember, ev.Sender())
+ stateEvent, err := snapshot.GetStateEvent(ctx, ev.RoomID(), gomatrixserverlib.MRoomMember, ev.Sender())
if err != nil {
logrus.WithError(err).WithField("user_id", event.Sender()).Warn("failed to query userprofile")
continue
@@ -209,7 +215,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
groups[event.RoomID()] = roomGroup
if _, ok := stateForRooms[event.RoomID()]; searchReq.SearchCategories.RoomEvents.IncludeState && !ok {
stateFilter := gomatrixserverlib.DefaultStateFilter()
- state, err := syncDB.CurrentState(ctx, event.RoomID(), &stateFilter, nil)
+ state, err := snapshot.CurrentState(ctx, event.RoomID(), &stateFilter, nil)
if err != nil {
logrus.WithError(err).Error("unable to get current state")
return jsonerror.InternalServerError()
@@ -252,24 +258,24 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
// contextEvents returns the events around a given eventID
func contextEvents(
ctx context.Context,
- syncDB storage.Database,
+ snapshot storage.DatabaseTransaction,
event *gomatrixserverlib.HeaderedEvent,
roomFilter *gomatrixserverlib.RoomEventFilter,
searchReq SearchRequest,
) ([]*gomatrixserverlib.HeaderedEvent, []*gomatrixserverlib.HeaderedEvent, error) {
- id, _, err := syncDB.SelectContextEvent(ctx, event.RoomID(), event.EventID())
+ id, _, err := snapshot.SelectContextEvent(ctx, event.RoomID(), event.EventID())
if err != nil {
logrus.WithError(err).Error("failed to query context event")
return nil, nil, err
}
roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.BeforeLimit
- eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, event.RoomID(), roomFilter)
+ eventsBefore, err := snapshot.SelectContextBeforeEvent(ctx, id, event.RoomID(), roomFilter)
if err != nil {
logrus.WithError(err).Error("failed to query before context event")
return nil, nil, err
}
roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.AfterLimit
- _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, event.RoomID(), roomFilter)
+ _, eventsAfter, err := snapshot.SelectContextAfterEvent(ctx, id, event.RoomID(), roomFilter)
if err != nil {
logrus.WithError(err).Error("failed to query after context event")
return nil, nil, err