diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-01-23 17:51:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-23 17:51:10 +0000 |
commit | 49f760a30b6496c8b3e1ceaf98dccc4376f6605d (patch) | |
tree | b00d3fc17144cc83df1e5c7b8d1080ca19041243 /syncapi/routing | |
parent | 43ecf8d1f909f4eb71bba93f6e7a57db59ec5941 (diff) |
CS API: Support for /messages, fixes for /sync (#847)
* Merge forward
* Tidy up a bit
* TODO: What to do with NextBatch here?
* Replace SyncPosition with PaginationToken throughout syncapi
* Fix PaginationTokens
* Fix lint errors
* Add a couple of missing functions into the syncapi external storage interface
* Some updates based on review comments from @babolivier
* Some updates based on review comments from @babolivier
* argh whitespacing
* Fix opentracing span
* Remove dead code
* Don't overshadow err (fix lint issue)
* Handle extremities after inserting event into topology
* Try insert event topology as ON CONFLICT DO NOTHING
* Prevent OOB error in addRoomDeltaToResponse
* Thwarted by gocyclo again
* Fix NewPaginationTokenFromString, define unit test for it
* Update pagination token test
* Update sytest-whitelist
* Hopefully fix some of the sync batch tokens
* Remove extraneous sync position func
* Revert to topology tokens in addRoomDeltaToResponse etc
* Fix typo
* Remove prevPDUPos as dead now that backwardTopologyPos is used instead
* Fix selectEventsWithEventIDsSQL
* Update sytest-blacklist
* Update sytest-whitelist
Diffstat (limited to 'syncapi/routing')
-rw-r--r-- | syncapi/routing/messages.go | 482 | ||||
-rw-r--r-- | syncapi/routing/routing.go | 18 |
2 files changed, 499 insertions, 1 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go new file mode 100644 index 00000000..26f48ca4 --- /dev/null +++ b/syncapi/routing/messages.go @@ -0,0 +1,482 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "context" + "net/http" + "sort" + "strconv" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +type messagesReq struct { + ctx context.Context + db storage.Database + queryAPI api.RoomserverQueryAPI + federation *gomatrixserverlib.FederationClient + cfg *config.Dendrite + roomID string + from *types.PaginationToken + to *types.PaginationToken + wasToProvided bool + limit int + backwardOrdering bool +} + +type messagesResp struct { + Start string `json:"start"` + End string `json:"end"` + Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` +} + +const defaultMessagesLimit = 10 + +// OnIncomingMessagesRequest implements the /messages endpoint from the +// client-server API. +// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages +func OnIncomingMessagesRequest( + req *http.Request, db storage.Database, roomID string, + federation *gomatrixserverlib.FederationClient, + queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, +) util.JSONResponse { + var err error + + // Extract parameters from the request's URL. + // Pagination tokens. + from, err := types.NewPaginationTokenFromString(req.URL.Query().Get("from")) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), + } + } + + // Direction to return events from. + dir := req.URL.Query().Get("dir") + if dir != "b" && dir != "f" { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"), + } + } + // A boolean is easier to handle in this case, especially since dir is sure + // to have one of the two accepted values (so dir == "f" <=> !backwardOrdering). + backwardOrdering := (dir == "b") + + // Pagination tokens. To is optional, and its default value depends on the + // direction ("b" or "f"). + var to *types.PaginationToken + wasToProvided := true + if s := req.URL.Query().Get("to"); len(s) > 0 { + to, err = types.NewPaginationTokenFromString(s) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), + } + } + } else { + // If "to" isn't provided, it defaults to either the earliest stream + // position (if we're going backward) or to the latest one (if we're + // going forward). + to, err = setToDefault(req.Context(), db, backwardOrdering, roomID) + if err != nil { + return httputil.LogThenError(req, err) + } + wasToProvided = false + } + + // Maximum number of events to return; defaults to 10. + limit := defaultMessagesLimit + if len(req.URL.Query().Get("limit")) > 0 { + limit, err = strconv.Atoi(req.URL.Query().Get("limit")) + + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()), + } + } + } + // TODO: Implement filtering (#587) + + // Check the room ID's format. + if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()), + } + } + + mReq := messagesReq{ + ctx: req.Context(), + db: db, + queryAPI: queryAPI, + federation: federation, + cfg: cfg, + roomID: roomID, + from: from, + to: to, + wasToProvided: wasToProvided, + limit: limit, + backwardOrdering: backwardOrdering, + } + + clientEvents, start, end, err := mReq.retrieveEvents() + if err != nil { + return httputil.LogThenError(req, err) + } + + // Respond with the events. + return util.JSONResponse{ + Code: http.StatusOK, + JSON: messagesResp{ + Chunk: clientEvents, + Start: start.String(), + End: end.String(), + }, + } +} + +// retrieveEvents retrieve events from the local database for a request on +// /messages. If there's not enough events to retrieve, it asks another +// homeserver in the room for older events. +// Returns an error if there was an issue talking to the database or with the +// remote homeserver. +func (r *messagesReq) retrieveEvents() ( + clientEvents []gomatrixserverlib.ClientEvent, start, + end *types.PaginationToken, err error, +) { + // Retrieve the events from the local database. + streamEvents, err := r.db.GetEventsInRange( + r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, + ) + if err != nil { + return + } + + var events []gomatrixserverlib.Event + + // There can be two reasons for streamEvents to be empty: either we've + // reached the oldest event in the room (or the most recent one, depending + // on the ordering), or we've reached a backward extremity. + if len(streamEvents) == 0 { + if events, err = r.handleEmptyEventsSlice(); err != nil { + return + } + } else { + if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil { + return + } + } + + // If we didn't get any event, we don't need to proceed any further. + if len(events) == 0 { + return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil + } + + // Sort the events to ensure we send them in the right order. We currently + // do that based on the event's timestamp. + if r.backwardOrdering { + sort.SliceStable(events, func(i int, j int) bool { + // Backward ordering is antichronological (latest event to oldest + // one). + return sortEvents(&(events[j]), &(events[i])) + }) + } else { + sort.SliceStable(events, func(i int, j int) bool { + // Forward ordering is chronological (oldest event to latest one). + return sortEvents(&(events[i]), &(events[j])) + }) + } + + // Convert all of the events into client events. + clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) + // Get the position of the first and the last event in the room's topology. + // This position is currently determined by the event's depth, so we could + // also use it instead of retrieving from the database. However, if we ever + // change the way topological positions are defined (as depth isn't the most + // reliable way to define it), it would be easier and less troublesome to + // only have to change it in one place, i.e. the database. + startPos, err := r.db.EventPositionInTopology( + r.ctx, streamEvents[0].EventID(), + ) + if err != nil { + return + } + endPos, err := r.db.EventPositionInTopology( + r.ctx, streamEvents[len(streamEvents)-1].EventID(), + ) + if err != nil { + return + } + // Generate pagination tokens to send to the client using the positions + // retrieved previously. + start = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeTopology, startPos, 0, + ) + end = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeTopology, endPos, 0, + ) + + if r.backwardOrdering { + // A stream/topological position is a cursor located between two events. + // While they are identified in the code by the event on their right (if + // we consider a left to right chronological order), tokens need to refer + // to them by the event on their left, therefore we need to decrement the + // end position we send in the response if we're going backward. + end.PDUPosition-- + } + + // The lowest token value is 1, therefore we need to manually set it to that + // value if we're below it. + if end.PDUPosition < types.StreamPosition(1) { + end.PDUPosition = types.StreamPosition(1) + } + + return clientEvents, start, end, err +} + +// handleEmptyEventsSlice handles the case where the initial request to the +// database returned an empty slice of events. It does so by checking whether +// the set is empty because we've reached a backward extremity, and if that is +// the case, by retrieving as much events as requested by backfilling from +// another homeserver. +// Returns an error if there was an issue talking with the database or +// backfilling. +func (r *messagesReq) handleEmptyEventsSlice() ( + events []gomatrixserverlib.Event, err error, +) { + backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID) + + // Check if we have backward extremities for this room. + if len(backwardExtremities) > 0 { + // If so, retrieve as much events as needed through backfilling. + events, err = r.backfill(backwardExtremities, r.limit) + if err != nil { + return + } + } else { + // If not, it means the slice was empty because we reached the room's + // creation, so return an empty slice. + events = []gomatrixserverlib.Event{} + } + + return +} + +// handleNonEmptyEventsSlice handles the case where the initial request to the +// database returned a non-empty slice of events. It does so by checking whether +// events are missing from the expected result, and retrieve missing events +// through backfilling if needed. +// Returns an error if there was an issue while backfilling. +func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) ( + events []gomatrixserverlib.Event, err error, +) { + // Check if we have enough events. + isSetLargeEnough := true + if len(streamEvents) < r.limit { + if r.backwardOrdering { + if r.wasToProvided { + // The condition in the SQL query is a strict "greater than" so + // we need to check against to-1. + streamPos := types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition) + isSetLargeEnough = (r.to.PDUPosition-1 == streamPos) + } + } else { + streamPos := types.StreamPosition(streamEvents[0].StreamPosition) + isSetLargeEnough = (r.from.PDUPosition-1 == streamPos) + } + } + + // Check if the slice contains a backward extremity. + backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID) + if err != nil { + return + } + + // Backfill is needed if we've reached a backward extremity and need more + // events. It's only needed if the direction is backward. + if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { + var pdus []gomatrixserverlib.Event + // Only ask the remote server for enough events to reach the limit. + pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) + if err != nil { + return + } + + // Append the PDUs to the list to send back to the client. + events = append(events, pdus...) + } + + // Append the events ve previously retrieved locally. + events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...) + + return +} + +// containsBackwardExtremity checks if a slice of StreamEvent contains a +// backward extremity. It does so by selecting the earliest event in the slice +// and by checking the presence in the database of all of its parent events, and +// considers the event itself a backward extremity if at least one of the parent +// events doesn't exist in the database. +// Returns an error if there was an issue with talking to the database. +func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) { + // Select the earliest retrieved event. + var ev *types.StreamEvent + if r.backwardOrdering { + ev = &(events[len(events)-1]) + } else { + ev = &(events[0]) + } + // Get the earliest retrieved event's parents. + prevIDs := ev.PrevEventIDs() + prevs, err := r.db.Events(r.ctx, prevIDs) + if err != nil { + return false, nil + } + // Check if we have all of the events we requested. If not, it means we've + // reached a backward extremity. + var eventInDB bool + var id string + // Iterate over the IDs we used in the request. + for _, id = range prevIDs { + eventInDB = false + // Iterate over the events we got in response. + for _, ev := range prevs { + if ev.EventID() == id { + eventInDB = true + } + } + // One occurrence of one the event's parents not being present in the + // database is enough to say that the event is a backward extremity. + if !eventInDB { + return true, nil + } + } + + return false, nil +} + +// backfill performs a backfill request over the federation on another +// homeserver in the room. +// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid +// It also stores the PDUs retrieved from the remote homeserver's response to +// the database. +// Returns with an empty string if the remote homeserver didn't return with any +// event, or if there is no remote homeserver to contact. +// Returns an error if there was an issue with retrieving the list of servers in +// the room or sending the request. +func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.Event, error) { + // Query the list of servers in the room when one of the backward extremities + // was sent. + var serversResponse api.QueryServersInRoomAtEventResponse + serversRequest := api.QueryServersInRoomAtEventRequest{ + RoomID: r.roomID, + EventID: fromEventIDs[0], + } + if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { + return nil, err + } + + // Use the first server from the response, except if that server is us. + // In that case, use the second one if the roomserver responded with + // enough servers. If not, use an empty string to prevent the backfill + // from happening as there's no server to direct the request towards. + // TODO: Be smarter at selecting the server to direct the request + // towards. + srvToBackfillFrom := serversResponse.Servers[0] + if srvToBackfillFrom == r.cfg.Matrix.ServerName { + if len(serversResponse.Servers) > 1 { + srvToBackfillFrom = serversResponse.Servers[1] + } else { + srvToBackfillFrom = gomatrixserverlib.ServerName("") + log.Warn("Not enough servers to backfill from") + } + } + + pdus := make([]gomatrixserverlib.Event, 0) + + // If the roomserver responded with at least one server that isn't us, + // send it a request for backfill. + if len(srvToBackfillFrom) > 0 { + txn, err := r.federation.Backfill( + r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, + ) + if err != nil { + return nil, err + } + + pdus = txn.PDUs + + // Store the events in the database, while marking them as unfit to show + // up in responses to sync requests. + for _, pdu := range pdus { + if _, err = r.db.WriteEvent( + r.ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{}, + nil, true, + ); err != nil { + return nil, err + } + } + } + + return pdus, nil +} + +// setToDefault returns the default value for the "to" query parameter of a +// request to /messages if not provided. It defaults to either the earliest +// topological position (if we're going backward) or to the latest one (if we're +// going forward). +// Returns an error if there was an issue with retrieving the latest position +// from the database +func setToDefault( + ctx context.Context, db storage.Database, backwardOrdering bool, + roomID string, +) (to *types.PaginationToken, err error) { + if backwardOrdering { + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0) + } else { + var pos types.StreamPosition + pos, err = db.MaxTopologicalPosition(ctx, roomID) + if err != nil { + return + } + + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos, 0) + } + + return +} + +// sortEvents is a function to give to sort.SliceStable, and compares the +// timestamp of two Matrix events. +// Returns true if the first event happened before the second one, false +// otherwise. +func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { + t := e1.OriginServerTS().Time() + return e2.OriginServerTS().Time().After(t) +} diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index bd9389bd..8916565d 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -22,8 +22,11 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -34,7 +37,12 @@ const pathPrefixR0 = "/_matrix/client/r0" // Due to Setup being used to call many other functions, a gocyclo nolint is // applied: // nolint: gocyclo -func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, deviceDB *devices.Database) { +func Setup( + apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, + deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, + queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, +) { r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() authData := auth.Data{ @@ -71,4 +79,12 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, d } return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"]) })).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars, err := common.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], federation, queryAPI, cfg) + })).Methods(http.MethodGet, http.MethodOptions) } |