aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/routing/messages.go')
-rw-r--r--syncapi/routing/messages.go42
1 files changed, 28 insertions, 14 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 03614302..8f3ed3f5 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
@@ -39,6 +40,7 @@ import (
type messagesReq struct {
ctx context.Context
db storage.Database
+ snapshot storage.DatabaseTransaction
rsAPI api.SyncRoomserverAPI
cfg *config.SyncAPI
roomID string
@@ -70,6 +72,16 @@ func OnIncomingMessagesRequest(
) util.JSONResponse {
var err error
+ // NewDatabaseTransaction is used here instead of NewDatabaseSnapshot as we
+ // expect to be able to write to the database in response to a /messages
+ // request that requires backfilling from the roomserver or federation.
+ snapshot, err := db.NewDatabaseTransaction(req.Context())
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ var succeeded bool
+ defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
+
// check if the user has already forgotten about this room
isForgotten, roomExists, err := checkIsRoomForgotten(req.Context(), roomID, device.UserID, rsAPI)
if err != nil {
@@ -132,7 +144,7 @@ func OnIncomingMessagesRequest(
}
} else {
fromStream = &streamToken
- from, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering)
+ from, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
@@ -154,7 +166,7 @@ func OnIncomingMessagesRequest(
JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()),
}
} else {
- to, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering)
+ to, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
@@ -165,7 +177,7 @@ func OnIncomingMessagesRequest(
// If "to" isn't provided, it defaults to either the earliest stream
// position (if we're going backward) or to the latest one (if we're
// going forward).
- to, err = setToDefault(req.Context(), db, backwardOrdering, roomID)
+ to, err = setToDefault(req.Context(), snapshot, backwardOrdering, roomID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("setToDefault failed")
return jsonerror.InternalServerError()
@@ -186,6 +198,7 @@ func OnIncomingMessagesRequest(
mReq := messagesReq{
ctx: req.Context(),
db: db,
+ snapshot: snapshot,
rsAPI: rsAPI,
cfg: cfg,
roomID: roomID,
@@ -217,7 +230,7 @@ func OnIncomingMessagesRequest(
Start: start.String(),
End: end.String(),
}
- res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
+ res.applyLazyLoadMembers(req.Context(), snapshot, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
// If we didn't return any events, set the end to an empty string, so it will be omitted
// in the response JSON.
@@ -229,6 +242,7 @@ func OnIncomingMessagesRequest(
}
// Respond with the events.
+ succeeded = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: res,
@@ -239,7 +253,7 @@ func OnIncomingMessagesRequest(
// LazyLoadMembers enabled.
func (m *messagesResp) applyLazyLoadMembers(
ctx context.Context,
- db storage.Database,
+ db storage.DatabaseTransaction,
roomID string,
device *userapi.Device,
lazyLoad bool,
@@ -292,7 +306,7 @@ func (r *messagesReq) retrieveEvents() (
end types.TopologyToken, err error,
) {
// Retrieve the events from the local database.
- streamEvents, err := r.db.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
+ streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
if err != nil {
err = fmt.Errorf("GetEventsInRange: %w", err)
return
@@ -348,7 +362,7 @@ func (r *messagesReq) retrieveEvents() (
// Apply room history visibility filter
startTime := time.Now()
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.snapshot, r.rsAPI, events, nil, r.device.UserID, "messages")
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": r.roomID,
@@ -366,7 +380,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
// else to go. This seems to fix Element iOS from looping on /messages endlessly.
end = types.TopologyToken{}
} else {
- end, err = r.db.EventPositionInTopology(
+ end, err = r.snapshot.EventPositionInTopology(
r.ctx, events[0].EventID(),
)
// A stream/topological position is a cursor located between two events.
@@ -378,7 +392,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
}
} else {
start = *r.from
- end, err = r.db.EventPositionInTopology(
+ end, err = r.snapshot.EventPositionInTopology(
r.ctx, events[len(events)-1].EventID(),
)
}
@@ -399,7 +413,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
func (r *messagesReq) handleEmptyEventsSlice() (
events []*gomatrixserverlib.HeaderedEvent, err error,
) {
- backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
+ backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
// Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 {
@@ -443,7 +457,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
}
// Check if the slice contains a backward extremity.
- backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
+ backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
if err != nil {
return
}
@@ -463,7 +477,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
}
// Append the events ve previously retrieved locally.
- events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...)
+ events = append(events, r.snapshot.StreamEventsToEvents(nil, streamEvents)...)
sort.Sort(eventsByDepth(events))
return
@@ -553,7 +567,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault(
- ctx context.Context, db storage.Database, backwardOrdering bool,
+ ctx context.Context, snapshot storage.DatabaseTransaction, backwardOrdering bool,
roomID string,
) (to types.TopologyToken, err error) {
if backwardOrdering {
@@ -561,7 +575,7 @@ func setToDefault(
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
to = types.TopologyToken{}
} else {
- to, err = db.MaxTopologicalPosition(ctx, roomID)
+ to, err = snapshot.MaxTopologicalPosition(ctx, roomID)
}
return