aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/routing/messages.go')
-rw-r--r--syncapi/routing/messages.go105
1 files changed, 57 insertions, 48 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index c3871618..23a09544 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -53,6 +53,7 @@ type messagesReq struct {
wasToProvided bool
backwardOrdering bool
filter *synctypes.RoomEventFilter
+ didBackfill bool
}
type messagesResp struct {
@@ -251,18 +252,19 @@ func OnIncomingMessagesRequest(
}
// If start and end are equal, we either reached the beginning or something else
- // is wrong. To avoid endless loops from clients, set end to 0 an empty string
- if start == end {
+ // is wrong. If we have nothing to return set end to 0.
+ if start == end || len(clientEvents) == 0 {
end = types.TopologyToken{}
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
- "from": from.String(),
- "to": to.String(),
- "limit": filter.Limit,
- "backwards": backwardOrdering,
- "return_start": start.String(),
- "return_end": end.String(),
+ "request_from": from.String(),
+ "request_to": to.String(),
+ "limit": filter.Limit,
+ "backwards": backwardOrdering,
+ "response_start": start.String(),
+ "response_end": end.String(),
+ "backfilled": mReq.didBackfill,
}).Info("Responding")
res := messagesResp{
@@ -284,11 +286,6 @@ func OnIncomingMessagesRequest(
})...)
}
- // If we didn't return any events, set the end to an empty string, so it will be omitted
- // in the response JSON.
- if len(res.Chunk) == 0 {
- res.End = ""
- }
if fromStream != nil {
res.StartStream = fromStream.String()
}
@@ -328,11 +325,12 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
) {
emptyToken := types.TopologyToken{}
// Retrieve the events from the local database.
- streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
+ streamEvents, _, end, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
if err != nil {
err = fmt.Errorf("GetEventsInRange: %w", err)
- return []synctypes.ClientEvent{}, emptyToken, emptyToken, err
+ return []synctypes.ClientEvent{}, *r.from, emptyToken, err
}
+ end.Decrement()
var events []*rstypes.HeaderedEvent
util.GetLogger(r.ctx).WithFields(logrus.Fields{
@@ -346,32 +344,54 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
// on the ordering), or we've reached a backward extremity.
if len(streamEvents) == 0 {
if events, err = r.handleEmptyEventsSlice(); err != nil {
- return []synctypes.ClientEvent{}, emptyToken, emptyToken, err
+ return []synctypes.ClientEvent{}, *r.from, emptyToken, err
}
} else {
if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil {
- return []synctypes.ClientEvent{}, emptyToken, emptyToken, err
+ return []synctypes.ClientEvent{}, *r.from, emptyToken, err
}
}
// If we didn't get any event, we don't need to proceed any further.
if len(events) == 0 {
- return []synctypes.ClientEvent{}, *r.from, *r.to, nil
+ return []synctypes.ClientEvent{}, *r.from, emptyToken, nil
}
- // Get the position of the first and the last event in the room's topology.
- // This position is currently determined by the event's depth, so we could
- // also use it instead of retrieving from the database. However, if we ever
- // change the way topological positions are defined (as depth isn't the most
- // reliable way to define it), it would be easier and less troublesome to
- // only have to change it in one place, i.e. the database.
- start, end, err = r.getStartEnd(events)
+ // Apply room history visibility filter
+ startTime := time.Now()
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.snapshot, r.rsAPI, events, nil, r.device.UserID, "messages")
if err != nil {
- return []synctypes.ClientEvent{}, *r.from, *r.to, err
+ return []synctypes.ClientEvent{}, *r.from, *r.to, nil
+ }
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": r.roomID,
+ "events_before": len(events),
+ "events_after": len(filteredEvents),
+ }).Debug("applied history visibility (messages)")
+
+ // No events left after applying history visibility
+ if len(filteredEvents) == 0 {
+ return []synctypes.ClientEvent{}, *r.from, emptyToken, nil
+ }
+
+ // If we backfilled in the process of getting events, we need
+ // to re-fetch the start/end positions
+ if r.didBackfill {
+ _, end, err = r.getStartEnd(filteredEvents)
+ if err != nil {
+ return []synctypes.ClientEvent{}, *r.from, *r.to, err
+ }
}
// Sort the events to ensure we send them in the right order.
if r.backwardOrdering {
+ if events[len(events)-1].Type() == spec.MRoomCreate {
+ // NOTSPEC: We've hit the beginning of the room so there's really nowhere
+ // else to go. This seems to fix Element iOS from looping on /messages endlessly.
+ end = types.TopologyToken{}
+ }
+
// This reverses the array from old->new to new->old
reversed := func(in []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
out := make([]*rstypes.HeaderedEvent, len(in))
@@ -380,24 +400,14 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
}
return out
}
- events = reversed(events)
- }
- if len(events) == 0 {
- return []synctypes.ClientEvent{}, *r.from, *r.to, nil
+ filteredEvents = reversed(filteredEvents)
}
- // Apply room history visibility filter
- startTime := time.Now()
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.snapshot, r.rsAPI, events, nil, r.device.UserID, "messages")
- logrus.WithFields(logrus.Fields{
- "duration": time.Since(startTime),
- "room_id": r.roomID,
- "events_before": len(events),
- "events_after": len(filteredEvents),
- }).Debug("applied history visibility (messages)")
+ start = *r.from
+
return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
- }), start, end, err
+ }), start, end, nil
}
func (r *messagesReq) getStartEnd(events []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
@@ -450,6 +460,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
if err != nil {
return
}
+ r.didBackfill = true
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
@@ -499,7 +510,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if err != nil {
return
}
-
+ r.didBackfill = true
// Append the PDUs to the list to send back to the client.
events = append(events, pdus...)
}
@@ -561,15 +572,17 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
if res.HistoryVisibility == "" {
res.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared
}
- for i := range res.Events {
+ events := res.Events
+ for i := range events {
+ events[i].Visibility = res.HistoryVisibility
_, err = r.db.WriteEvent(
context.Background(),
- res.Events[i],
+ events[i],
[]*rstypes.HeaderedEvent{},
[]string{},
[]string{},
nil, true,
- res.HistoryVisibility,
+ events[i].Visibility,
)
if err != nil {
return nil, err
@@ -577,14 +590,10 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
}
// we may have got more than the requested limit so resize now
- events := res.Events
if len(events) > limit {
// last `limit` events
events = events[len(events)-limit:]
}
- for _, ev := range events {
- ev.Visibility = res.HistoryVisibility
- }
return events, nil
}