aboutsummaryrefslogtreecommitdiff
path: root/syncapi/routing
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-03-01 15:39:56 +0100
committerGitHub <noreply@github.com>2022-03-01 14:39:56 +0000
commitaf610df85a6a2145d5ddcd6419bb8085ae224207 (patch)
tree79d103fcd4857214eb0e52ef66414874f328df60 /syncapi/routing
parent471fda810a24f8945f5da3a76d2730a1ffe80166 (diff)
Return state on calls to /message and lazy load members (#2218)
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/routing')
-rw-r--r--syncapi/routing/context.go4
-rw-r--r--syncapi/routing/context_test.go6
-rw-r--r--syncapi/routing/messages.go61
3 files changed, 41 insertions, 30 deletions
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index ef7efa2b..59113971 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -44,7 +44,7 @@ func Context(
syncDB storage.Database,
roomID, eventID string,
) util.JSONResponse {
- filter, err := parseContextParams(req)
+ filter, err := parseRoomEventFilter(req)
if err != nil {
errMsg := ""
switch err.(type) {
@@ -164,7 +164,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter
return newState
}
-func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
+func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
// Default room filter
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}
diff --git a/syncapi/routing/context_test.go b/syncapi/routing/context_test.go
index 1b430d83..e79a5d5f 100644
--- a/syncapi/routing/context_test.go
+++ b/syncapi/routing/context_test.go
@@ -55,13 +55,13 @@ func Test_parseContextParams(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- gotFilter, err := parseContextParams(tt.req)
+ gotFilter, err := parseRoomEventFilter(tt.req)
if (err != nil) != tt.wantErr {
- t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr)
+ t.Errorf("parseRoomEventFilter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
- t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
+ t.Errorf("parseRoomEventFilter() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
}
})
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 9bb8c6d2..7cd54eef 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -19,7 +19,6 @@ import (
"fmt"
"net/http"
"sort"
- "strconv"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -45,8 +44,8 @@ type messagesReq struct {
fromStream *types.StreamingToken
device *userapi.Device
wasToProvided bool
- limit int
backwardOrdering bool
+ filter *gomatrixserverlib.RoomEventFilter
}
type messagesResp struct {
@@ -54,10 +53,9 @@ type messagesResp struct {
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
End string `json:"end"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
+ State []gomatrixserverlib.ClientEvent `json:"state"`
}
-const defaultMessagesLimit = 10
-
// OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
@@ -83,6 +81,14 @@ func OnIncomingMessagesRequest(
}
}
+ filter, err := parseRoomEventFilter(req)
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.InvalidArgumentValue("unable to parse filter"),
+ }
+ }
+
// Extract parameters from the request's URL.
// Pagination tokens.
var fromStream *types.StreamingToken
@@ -143,18 +149,6 @@ func OnIncomingMessagesRequest(
wasToProvided = false
}
- // Maximum number of events to return; defaults to 10.
- limit := defaultMessagesLimit
- if len(req.URL.Query().Get("limit")) > 0 {
- limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
-
- if err != nil {
- return util.JSONResponse{
- Code: http.StatusBadRequest,
- JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
- }
- }
- }
// TODO: Implement filtering (#587)
// Check the room ID's format.
@@ -176,7 +170,7 @@ func OnIncomingMessagesRequest(
to: &to,
fromStream: fromStream,
wasToProvided: wasToProvided,
- limit: limit,
+ filter: filter,
backwardOrdering: backwardOrdering,
device: device,
}
@@ -187,10 +181,27 @@ func OnIncomingMessagesRequest(
return jsonerror.InternalServerError()
}
+ // at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
+ state := []gomatrixserverlib.ClientEvent{}
+ if filter.LazyLoadMembers {
+ memberShipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
+ for _, evt := range clientEvents {
+ memberShip, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
+ continue
+ }
+ memberShipToUser[evt.Sender] = memberShip
+ }
+ for _, evt := range memberShipToUser {
+ state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
+ }
+ }
+
util.GetLogger(req.Context()).WithFields(logrus.Fields{
"from": from.String(),
"to": to.String(),
- "limit": limit,
+ "limit": filter.Limit,
"backwards": backwardOrdering,
"return_start": start.String(),
"return_end": end.String(),
@@ -200,6 +211,7 @@ func OnIncomingMessagesRequest(
Chunk: clientEvents,
Start: start.String(),
End: end.String(),
+ State: state,
}
if emptyFromSupplied {
res.StartStream = fromStream.String()
@@ -234,19 +246,18 @@ func (r *messagesReq) retrieveEvents() (
clientEvents []gomatrixserverlib.ClientEvent, start,
end types.TopologyToken, err error,
) {
- eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
- eventFilter.Limit = r.limit
+ eventFilter := r.filter
// Retrieve the events from the local database.
var streamEvents []types.StreamEvent
if r.fromStream != nil {
toStream := r.to.StreamToken()
streamEvents, err = r.db.GetEventsInStreamingRange(
- r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
+ r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering,
)
} else {
streamEvents, err = r.db.GetEventsInTopologicalRange(
- r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering,
+ r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering,
)
}
if err != nil {
@@ -434,7 +445,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling.
- events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
+ events, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit)
if err != nil {
return
}
@@ -456,7 +467,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []*gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
- isSetLargeEnough := len(streamEvents) >= r.limit
+ isSetLargeEnough := len(streamEvents) >= r.filter.Limit
if !isSetLargeEnough {
// it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering {
@@ -483,7 +494,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []*gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit.
- pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
+ pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
if err != nil {
return
}