diff options
author | ruben <code@rbn.im> | 2019-05-21 22:56:55 +0200 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-05-21 21:56:55 +0100 |
commit | 74827428bd3e11faab65f12204449c1b9469b0ae (patch) | |
tree | 0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /syncapi | |
parent | 4d588f7008afe5600219ac0930c2eee2de5c447b (diff) |
use go module for dependencies (#594)
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/README.md | 86 | ||||
-rw-r--r-- | syncapi/consumers/clientapi.go | 95 | ||||
-rw-r--r-- | syncapi/consumers/roomserver.go | 286 | ||||
-rw-r--r-- | syncapi/routing/routing.go | 61 | ||||
-rw-r--r-- | syncapi/routing/state.go | 118 | ||||
-rw-r--r-- | syncapi/storage/account_data_table.go | 141 | ||||
-rw-r--r-- | syncapi/storage/current_room_state_table.go | 249 | ||||
-rw-r--r-- | syncapi/storage/invites_table.go | 133 | ||||
-rw-r--r-- | syncapi/storage/output_room_events_table.go | 294 | ||||
-rw-r--r-- | syncapi/storage/syncserver.go | 695 | ||||
-rw-r--r-- | syncapi/sync/notifier.go | 243 | ||||
-rw-r--r-- | syncapi/sync/notifier_test.go | 293 | ||||
-rw-r--r-- | syncapi/sync/request.go | 87 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 216 | ||||
-rw-r--r-- | syncapi/sync/userstream.go | 162 | ||||
-rw-r--r-- | syncapi/syncapi.go | 75 | ||||
-rw-r--r-- | syncapi/types/types.go | 147 |
17 files changed, 3381 insertions, 0 deletions
diff --git a/syncapi/README.md b/syncapi/README.md new file mode 100644 index 00000000..7221b22d --- /dev/null +++ b/syncapi/README.md @@ -0,0 +1,86 @@ +# Sync API Server + +This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. Currently, the sync server will: + - Return a valid `/sync` response for the user represented by the provided `access_token`. + - Return a "complete sync" if no `since` value is provided, and return a valid `next_batch` token. This contains all rooms the user has been invited to or has joined. For joined rooms, this includes the complete current room state and the most recent 20 (hard-coded) events in the timeline. + - For "incremental syncs" (a `since` value is provided), as you get invited to, join, or leave rooms they will be reflected correctly in the `/sync` response. + - For very large state deltas, the `state` section of a room is correctly populated with the state of the room at the *start* of the timeline. + - When you join a room, the `/sync` which transitions your client to be "joined" will include the complete current room state as per the specification. + - Only wake up user streams it needs to wake up. + - Honours the `timeout` query parameter value. + +## Internals + +When the server gets a `/sync` request, it needs to: + - Work out *which* rooms to return to the client. + - For each room, work out *which* events to return to the client. + +The logic for working out which rooms is based on [Synapse](https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821): + 1) Get the CURRENT joined room list for this user. + 2) Get membership list changes for this user between the provided stream position and now. + 3) For each room which has membership list changes: + - Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). + If it is, then we need to send the full room state down (and 'limited' is always true). + - Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + - Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. + 4) Add joined rooms (joined room list) + +For each room, the /sync response returns the most recent timeline events and the state of the room at the start of the timeline. +The logic for working out *which* events is not based entirely on Synapse code, as it is known broken with respect to working out +room state. In order to know which events to return, the server needs to calculate room state at various points in the history of +the room. For example, imagine a room with the following 15 events (letters are state events (updated via `'`), numbers are timeline events): + +``` +index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (1-based indexing as StreamPosition(0) represents no event) +timeline [A, B, C, D, 1, 2, 3, D', 4, D'', 5, B', D''', D'''', 6] +``` + +The current state of this room is: `[A, B', C, D'''']`. + +If this room was requested with `?since=14&limit=5` then 1 timeline event would be returned, the most recent one: +``` + 15 + [ 6 ] +``` + +If this room was requested with `?since=9&limit=5` then 5 timeline events would be returned, the most recent ones: +``` + 11 12 13 14 15 + [5, B', D''', D'''', 6] +``` + +The state of the room at the START of the timeline can be represented in 2 ways: + - The `full_state` from index 0 : `[A, B, C, D'']` (aka the state between 0-11 exclusive) + - A partial state from index 9 : `[D'']` (aka the state between 9-11 exclusive) + +Servers advance state events (e.g from `D'` to `D''`) based on the state conflict resolution algorithm. +You might think that you could advance the current state by just updating the entry for the `(event type, state_key)` tuple +for each state event, but this state can diverge from the state calculated using the state conflict resolution algorithm. +For example, if there are two "simultaneous" updates to the same state key, that is two updates at the same depth in the +event graph, then the final result of the state conflict resolution algorithm might not match the order the events appear +in the timeline. + +The correct advancement for state events is represented by the `AddsStateEventIDs` and `RemovesStateEventIDs` that +are in `OutputRoomEvents` from the room server. + +This version of the sync server uses very simple indexing to calculate room state at various points. +This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes +very large. This is mitigated slightly with indexes, but better data structures could be used in the future. + +## Known Issues + +- `m.room.history_visibility` is not honoured: it is always treated as "shared". +- All ephemeral events are not implemented (presence, typing, receipts). +- Account data (both user and room) is not implemented. +- `to_device` messages are not implemented. +- Back-pagination via `prev_batch` is not implemented. +- The `limited` flag can lie. +- Filters are not honoured or implemented. The `limit` for each room is hard-coded to 20. +- The `full_state` query parameter is not implemented. +- The `set_presence` query parameter is not implemented. +- "Ignored" users are not ignored. +- Redacted events are still sent to clients. +- Invites over federation (if it existed) won't work as they aren't "real" events and so won't be in the right tables. +- `invite_state` is not implemented (for similar reasons to the above point). +- The current implementation scales badly when a very old `since` token is provided. +- The entire current room state can be re-sent to the client if they send a duplicate "join" event which should be a no-op. diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go new file mode 100644 index 00000000..d05a7692 --- /dev/null +++ b/syncapi/consumers/clientapi.go @@ -0,0 +1,95 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + log "github.com/sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputClientDataConsumer consumes events that originated in the client API server. +type OutputClientDataConsumer struct { + clientAPIConsumer *common.ContinualConsumer + db *storage.SyncServerDatabase + notifier *sync.Notifier +} + +// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. +func NewOutputClientDataConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, +) *OutputClientDataConsumer { + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputClientData), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputClientDataConsumer{ + clientAPIConsumer: &consumer, + db: store, + notifier: n, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputClientDataConsumer) Start() error { + return s.clientAPIConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the client API server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output common.AccountData + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("client API server output log: message parse failure") + return nil + } + + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + }).Info("received data from client API server") + + syncStreamPos, err := s.db.UpsertAccountData( + context.TODO(), string(msg.Key), output.RoomID, output.Type, + ) + if err != nil { + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + log.ErrorKey: err, + }).Panicf("could not save account data") + } + + s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos) + + return nil +} diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go new file mode 100644 index 00000000..273b6aea --- /dev/null +++ b/syncapi/consumers/roomserver.go @@ -0,0 +1,286 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + roomServerConsumer *common.ContinualConsumer + db *storage.SyncServerDatabase + notifier *sync.Notifier + query api.RoomserverQueryAPI +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEventConsumer { + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + roomServerConsumer: &consumer, + db: store, + notifier: n, + query: queryAPI, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputRoomEventConsumer) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + switch output.Type { + case api.OutputTypeNewRoomEvent: + return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeNewInviteEvent: + return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) + case api.OutputTypeRetireInviteEvent: + return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + default: + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } +} + +func (s *OutputRoomEventConsumer) onNewRoomEvent( + ctx context.Context, msg api.OutputNewRoomEvent, +) error { + ev := msg.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + }).Info("received event from roomserver") + + addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) + if err != nil { + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, + }).Panicf("roomserver output log: state event lookup failure") + } + + ev, err = s.updateStateEvent(ev) + if err != nil { + return err + } + + for i := range addsStateEvents { + addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) + if err != nil { + return err + } + } + + syncStreamPos, err := s.db.WriteEvent( + ctx, + &ev, + addsStateEvents, + msg.AddsStateEventIDs, + msg.RemovesStateEventIDs, + msg.TransactionID, + ) + if err != nil { + return err + } + + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, + }).Panicf("roomserver output log: write event failure") + return nil + } + s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos)) + + return nil +} + +func (s *OutputRoomEventConsumer) onNewInviteEvent( + ctx context.Context, msg api.OutputNewInviteEvent, +) error { + syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(msg.Event.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite failure") + return nil + } + s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos) + return nil +} + +func (s *OutputRoomEventConsumer) onRetireInviteEvent( + ctx context.Context, msg api.OutputRetireInviteEvent, +) error { + err := s.db.RetireInviteEvent(ctx, msg.EventID) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event_id": msg.EventID, + log.ErrorKey: err, + }).Panicf("roomserver output log: remove invite failure") + return nil + } + // TODO: Notify any active sync requests that the invite has been retired. + // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos) + return nil +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEventConsumer) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + // Check if this is re-adding a state events that we previously processed + // If we have previously received a state event it may still be in + // our event database. + result, err := s.db.Events(context.TODO(), addsStateEventIDs) + if err != nil { + return nil, err + } + missing := missingEventsFrom(result, addsStateEventIDs) + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { + var stateKey string + if event.StateKey() == nil { + stateKey = "" + } else { + stateKey = *event.StateKey() + } + + prevEvent, err := s.db.GetStateEvent( + context.TODO(), event.Type(), event.RoomID(), stateKey, + ) + if err != nil { + return event, err + } + + if prevEvent == nil { + return event, nil + } + + prev := types.PrevEventRef{ + PrevContent: prevEvent.Content(), + ReplacesState: prevEvent.EventID(), + PrevSender: prevEvent.Sender(), + } + + return event.SetUnsigned(prev) +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go new file mode 100644 index 00000000..93d939c3 --- /dev/null +++ b/syncapi/routing/routing.go @@ -0,0 +1,61 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/util" +) + +const pathPrefixR0 = "/_matrix/client/r0" + +// Setup configures the given mux with sync-server listeners +func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) { + r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + + authData := auth.Data{ + AccountDB: nil, + DeviceDB: deviceDB, + AppServices: nil, + } + + // TODO: Add AS support for all handlers below. + r0mux.Handle("/sync", common.MakeAuthAPI("sync", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + return srp.OnIncomingSyncRequest(req, device) + })).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return OnIncomingStateRequest(req, syncDB, vars["roomID"]) + })).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/rooms/{roomID}/state/{type}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], "") + })).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/rooms/{roomID}/state/{type}/{stateKey}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"]) + })).Methods(http.MethodGet, http.MethodOptions) +} diff --git a/syncapi/routing/state.go b/syncapi/routing/state.go new file mode 100644 index 00000000..6b98a0b7 --- /dev/null +++ b/syncapi/routing/state.go @@ -0,0 +1,118 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "encoding/json" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +type stateEventInStateResp struct { + gomatrixserverlib.ClientEvent + PrevContent json.RawMessage `json:"prev_content,omitempty"` + ReplacesState string `json:"replaces_state,omitempty"` +} + +// OnIncomingStateRequest is called when a client makes a /rooms/{roomID}/state +// request. It will fetch all the state events from the specified room and will +// append the necessary keys to them if applicable before returning them. +// Returns an error if something went wrong in the process. +// TODO: Check if the user is in the room. If not, check if the room's history +// is publicly visible. Current behaviour is returning an empty array if the +// user cannot see the room's history. +func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse { + // TODO(#287): Auth request and handle the case where the user has left (where + // we should return the state at the poin they left) + + stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID) + if err != nil { + return httputil.LogThenError(req, err) + } + + resp := []stateEventInStateResp{} + // Fill the prev_content and replaces_state keys if necessary + for _, event := range stateEvents { + stateEvent := stateEventInStateResp{ + ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll), + } + var prevEventRef types.PrevEventRef + if len(event.Unsigned()) > 0 { + if err := json.Unmarshal(event.Unsigned(), &prevEventRef); err != nil { + return httputil.LogThenError(req, err) + } + // Fills the previous state event ID if the state event replaces another + // state event + if len(prevEventRef.ReplacesState) > 0 { + stateEvent.ReplacesState = prevEventRef.ReplacesState + } + // Fill the previous event if the state event references a previous event + if prevEventRef.PrevContent != nil { + stateEvent.PrevContent = prevEventRef.PrevContent + } + } + + resp = append(resp, stateEvent) + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: resp, + } +} + +// OnIncomingStateTypeRequest is called when a client makes a +// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current +// state to see if there is an event with that type and state key, if there +// is then (by default) we return the content, otherwise a 404. +func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse { + // TODO(#287): Auth request and handle the case where the user has left (where + // we should return the state at the poin they left) + + logger := util.GetLogger(req.Context()) + logger.WithFields(log.Fields{ + "roomID": roomID, + "evType": evType, + "stateKey": stateKey, + }).Info("Fetching state") + + event, err := db.GetStateEvent(req.Context(), roomID, evType, stateKey) + if err != nil { + return httputil.LogThenError(req, err) + } + + if event == nil { + return util.JSONResponse{ + Code: http.StatusNotFound, + JSON: jsonerror.NotFound("cannot find state"), + } + } + + stateEvent := stateEventInStateResp{ + ClientEvent: gomatrixserverlib.ToClientEvent(*event, gomatrixserverlib.FormatAll), + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: stateEvent.Content, + } +} diff --git a/syncapi/storage/account_data_table.go b/syncapi/storage/account_data_table.go new file mode 100644 index 00000000..d4d74d15 --- /dev/null +++ b/syncapi/storage/account_data_table.go @@ -0,0 +1,141 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +const accountDataSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores the types of account data that a user set has globally and in each room +-- and the stream ID when that type was last updated. +CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( + -- An incrementing ID which denotes the position in the log that this event resides at. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- ID of the user the data belongs to + user_id TEXT NOT NULL, + -- ID of the room the data is related to (empty string if not related to a specific room) + room_id TEXT NOT NULL, + -- Type of the data + type TEXT NOT NULL, + + -- We don't want two entries of the same type for the same user + CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) +); + +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id); +` + +const insertAccountDataSQL = "" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + + " DO UPDATE SET id = EXCLUDED.id" + + " RETURNING id" + +const selectAccountDataInRangeSQL = "" + + "SELECT room_id, type FROM syncapi_account_data_type" + + " WHERE user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id ASC" + +const selectMaxAccountDataIDSQL = "" + + "SELECT MAX(id) FROM syncapi_account_data_type" + +type accountDataStatements struct { + insertAccountDataStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt + selectMaxAccountDataIDStmt *sql.Stmt +} + +func (s *accountDataStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(accountDataSchema) + if err != nil { + return + } + if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { + return + } + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + return + } + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + return + } + return +} + +func (s *accountDataStatements) insertAccountData( + ctx context.Context, + userID, roomID, dataType string, +) (pos int64, err error) { + err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) + return +} + +func (s *accountDataStatements) selectAccountDataInRange( + ctx context.Context, + userID string, + oldPos, newPos types.StreamPosition, +) (data map[string][]string, err error) { + data = make(map[string][]string) + + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // it would prevent the SQL request from returning anything. + if oldPos == newPos { + oldPos-- + } + + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) + if err != nil { + return + } + + for rows.Next() { + var dataType string + var roomID string + + if err = rows.Scan(&roomID, &dataType); err != nil { + return + } + + if len(data[roomID]) > 0 { + data[roomID] = append(data[roomID], dataType) + } else { + data[roomID] = []string{dataType} + } + } + + return +} + +func (s *accountDataStatements) selectMaxAccountDataID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/syncapi/storage/current_room_state_table.go b/syncapi/storage/current_room_state_table.go new file mode 100644 index 00000000..852bfd76 --- /dev/null +++ b/syncapi/storage/current_room_state_table.go @@ -0,0 +1,249 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const currentRoomStateSchema = ` +-- Stores the current room state for every room. +CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( + -- The 'room_id' key for the state event. + room_id TEXT NOT NULL, + -- The state event ID + event_id TEXT NOT NULL, + -- The state event type e.g 'm.room.member' + type TEXT NOT NULL, + -- The state_key value for this state event e.g '' + state_key TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The 'content.membership' value if this event is an m.room.member event. For other + -- events, this will be NULL. + membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, + -- Clobber based on 3-uple of room_id, type and state_key + CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) +); +-- for event deletion +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id); +-- for querying membership states of users +CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; +` + +const upsertRoomStateSQL = "" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" + +const deleteRoomStateByEventIDSQL = "" + + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" + +const selectRoomIDsWithMembershipSQL = "" + + "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" + +const selectCurrentStateSQL = "" + + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + +const selectJoinedUsersSQL = "" + + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" + +const selectStateEventSQL = "" + + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" + +const selectEventsWithEventIDsSQL = "" + + "SELECT added_at, event_json FROM syncapi_current_room_state WHERE event_id = ANY($1)" + +type currentRoomStateStatements struct { + upsertRoomStateStmt *sql.Stmt + deleteRoomStateByEventIDStmt *sql.Stmt + selectRoomIDsWithMembershipStmt *sql.Stmt + selectCurrentStateStmt *sql.Stmt + selectJoinedUsersStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt + selectStateEventStmt *sql.Stmt +} + +func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(currentRoomStateSchema) + if err != nil { + return + } + if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { + return + } + if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { + return + } + if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { + return + } + if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { + return + } + if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { + return + } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return + } + return +} + +// JoinedMemberLists returns a map of room ID to a list of joined user IDs. +func (s *currentRoomStateStatements) selectJoinedUsers( + ctx context.Context, +) (map[string][]string, error) { + rows, err := s.selectJoinedUsersStmt.QueryContext(ctx) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + result := make(map[string][]string) + for rows.Next() { + var roomID string + var userID string + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, nil +} + +// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. +func (s *currentRoomStateStatements) selectRoomIDsWithMembership( + ctx context.Context, + txn *sql.Tx, + userID string, + membership string, // nolint: unparam +) ([]string, error) { + stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) + rows, err := stmt.QueryContext(ctx, userID, membership) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var result []string + for rows.Next() { + var roomID string + if err := rows.Scan(&roomID); err != nil { + return nil, err + } + result = append(result, roomID) + } + return result, nil +} + +// CurrentState returns all the current state events for the given room. +func (s *currentRoomStateStatements) selectCurrentState( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]gomatrixserverlib.Event, error) { + stmt := common.TxStmt(txn, s.selectCurrentStateStmt) + rows, err := stmt.QueryContext(ctx, roomID) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + return rowsToEvents(rows) +} + +func (s *currentRoomStateStatements) deleteRoomStateByEventID( + ctx context.Context, txn *sql.Tx, eventID string, +) error { + stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt) + _, err := stmt.ExecContext(ctx, eventID) + return err +} + +func (s *currentRoomStateStatements) upsertRoomState( + ctx context.Context, txn *sql.Tx, + event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { + stmt := common.TxStmt(txn, s.upsertRoomStateStmt) + _, err := stmt.ExecContext( + ctx, + event.RoomID(), + event.EventID(), + event.Type(), + *event.StateKey(), + event.JSON(), + membership, + addedAt, + ) + return err +} + +func (s *currentRoomStateStatements) selectEventsWithEventIDs( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) + rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + return rowsToStreamEvents(rows) +} + +func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { + result := []gomatrixserverlib.Event{} + for rows.Next() { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + result = append(result, ev) + } + return result, nil +} + +func (s *currentRoomStateStatements) selectStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.Event, error) { + stmt := s.selectStateEventStmt + var res []byte + err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) + return &ev, err +} diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/invites_table.go new file mode 100644 index 00000000..88c98f7e --- /dev/null +++ b/syncapi/storage/invites_table.go @@ -0,0 +1,133 @@ +package storage + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const inviteEventsSchema = ` +CREATE TABLE IF NOT EXISTS syncapi_invite_events ( + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + event_json TEXT NOT NULL +); + +-- For looking up the invites for a given user. +CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx + ON syncapi_invite_events (target_user_id, id); + +-- For deleting old invites +CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx + ON syncapi_invite_events(target_user_id, id); +` + +const insertInviteEventSQL = "" + + "INSERT INTO syncapi_invite_events (" + + " room_id, event_id, target_user_id, event_json" + + ") VALUES ($1, $2, $3, $4) RETURNING id" + +const deleteInviteEventSQL = "" + + "DELETE FROM syncapi_invite_events WHERE event_id = $1" + +const selectInviteEventsInRangeSQL = "" + + "SELECT room_id, event_json FROM syncapi_invite_events" + + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id DESC" + +const selectMaxInviteIDSQL = "" + + "SELECT MAX(id) FROM syncapi_invite_events" + +type inviteEventsStatements struct { + insertInviteEventStmt *sql.Stmt + selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt + selectMaxInviteIDStmt *sql.Stmt +} + +func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(inviteEventsSchema) + if err != nil { + return + } + if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { + return + } + if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { + return + } + if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { + return + } + if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil { + return + } + return +} + +func (s *inviteEventsStatements) insertInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (streamPos int64, err error) { + err = s.insertInviteEventStmt.QueryRowContext( + ctx, + inviteEvent.RoomID(), + inviteEvent.EventID(), + *inviteEvent.StateKey(), + inviteEvent.JSON(), + ).Scan(&streamPos) + return +} + +func (s *inviteEventsStatements) deleteInviteEvent( + ctx context.Context, inviteEventID string, +) error { + _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) + return err +} + +// selectInviteEventsInRange returns a map of room ID to invite event for the +// active invites for the target user ID in the supplied range. +func (s *inviteEventsStatements) selectInviteEventsInRange( + ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, +) (map[string]gomatrixserverlib.Event, error) { + stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) + rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + result := map[string]gomatrixserverlib.Event{} + for rows.Next() { + var ( + roomID string + eventJSON []byte + ) + if err = rows.Scan(&roomID, &eventJSON); err != nil { + return nil, err + } + + event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) + if err != nil { + return nil, err + } + + result[roomID] = event + } + return result, nil +} + +func (s *inviteEventsStatements) selectMaxInviteID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxInviteIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/output_room_events_table.go new file mode 100644 index 00000000..035db988 --- /dev/null +++ b/syncapi/storage/output_room_events_table.go @@ -0,0 +1,294 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + "sort" + + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const outputRoomEventsSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( + -- An incrementing ID which denotes the position in the log that this event resides at. + -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. + -- This isn't a problem for us since we just want to order by this field. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- The event ID for the event + event_id TEXT NOT NULL, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- A list of event IDs which represent a delta of added/removed room state. This can be NULL + -- if there is no delta. + add_state_ids TEXT[], + remove_state_ids TEXT[], + device_id TEXT, -- The local device that sent the event, if any + transaction_id TEXT -- The transaction id used to send the event, if any +); +-- for event selection +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); +` + +const insertEventSQL = "" + + "INSERT INTO syncapi_output_room_events (" + + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" + +const selectEventsSQL = "" + + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" + +const selectRecentEventsSQL = "" + + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id DESC LIMIT $4" + +const selectMaxEventIDSQL = "" + + "SELECT MAX(id) FROM syncapi_output_room_events" + +// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). +const selectStateInRangeSQL = "" + + "SELECT id, event_json, add_state_ids, remove_state_ids" + + " FROM syncapi_output_room_events" + + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " ORDER BY id ASC" + +type outputRoomEventsStatements struct { + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt +} + +func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(outputRoomEventsSchema) + if err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { + return + } + if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { + return + } + if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { + return + } + if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { + return + } + return +} + +// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. +// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the +// two positions, only the most recent state is returned. +func (s *outputRoomEventsStatements) selectStateInRange( + ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, +) (map[string]map[string]bool, map[string]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectStateInRangeStmt) + + rows, err := stmt.QueryContext(ctx, oldPos, newPos) + if err != nil { + return nil, nil, err + } + // Fetch all the state change events for all rooms between the two positions then loop each event and: + // - Keep a cache of the event by ID (99% of state change events are for the event itself) + // - For each room ID, build up an array of event IDs which represents cumulative adds/removes + // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID + // if they aren't in the event ID cache. We don't handle state deletion yet. + eventIDToEvent := make(map[string]streamEvent) + + // RoomID => A set (map[string]bool) of state event IDs which are between the two positions + stateNeeded := make(map[string]map[string]bool) + + for rows.Next() { + var ( + streamPos int64 + eventBytes []byte + addIDs pq.StringArray + delIDs pq.StringArray + ) + if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { + return nil, nil, err + } + // Sanity check for deleted state and whine if we see it. We don't need to do anything + // since it'll just mark the event as not being needed. + if len(addIDs) < len(delIDs) { + log.WithFields(log.Fields{ + "since": oldPos, + "current": newPos, + "adds": addIDs, + "dels": delIDs, + }).Warn("StateBetween: ignoring deleted state") + } + + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, nil, err + } + needSet := stateNeeded[ev.RoomID()] + if needSet == nil { // make set if required + needSet = make(map[string]bool) + } + for _, id := range delIDs { + needSet[id] = false + } + for _, id := range addIDs { + needSet[id] = true + } + stateNeeded[ev.RoomID()] = needSet + + eventIDToEvent[ev.EventID()] = streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + } + } + + return stateNeeded, eventIDToEvent, nil +} + +// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, +// then this function should only ever be used at startup, as it will race with inserting events if it is +// done afterwards. If there are no inserted events, 0 is returned. +func (s *outputRoomEventsStatements) selectMaxEventID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxEventIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} + +// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position +// of the inserted event. +func (s *outputRoomEventsStatements) insertEvent( + ctx context.Context, txn *sql.Tx, + event *gomatrixserverlib.Event, addState, removeState []string, + transactionID *api.TransactionID, +) (streamPos int64, err error) { + var deviceID, txnID *string + if transactionID != nil { + deviceID = &transactionID.DeviceID + txnID = &transactionID.TransactionID + } + + stmt := common.TxStmt(txn, s.insertEventStmt) + err = stmt.QueryRowContext( + ctx, + event.RoomID(), + event.EventID(), + event.JSON(), + pq.StringArray(addState), + pq.StringArray(removeState), + deviceID, + txnID, + ).Scan(&streamPos) + return +} + +// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. +func (s *outputRoomEventsStatements) selectRecentEvents( + ctx context.Context, txn *sql.Tx, + roomID string, fromPos, toPos types.StreamPosition, limit int, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectRecentEventsStmt) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + events, err := rowsToStreamEvents(rows) + if err != nil { + return nil, err + } + // The events need to be returned from oldest to latest, which isn't + // necessary the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(events, func(i int, j int) bool { + return events[i].streamPosition < events[j].streamPosition + }) + return events, nil +} + +// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing +// from the database. +func (s *outputRoomEventsStatements) selectEvents( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + stmt := common.TxStmt(txn, s.selectEventsStmt) + rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + return rowsToStreamEvents(rows) +} + +func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { + var result []streamEvent + for rows.Next() { + var ( + streamPos int64 + eventBytes []byte + deviceID *string + txnID *string + transactionID *api.TransactionID + ) + if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil { + return nil, err + } + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + + if deviceID != nil && txnID != nil { + transactionID = &api.TransactionID{ + DeviceID: *deviceID, + TransactionID: *txnID, + } + } + + result = append(result, streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + transactionID: transactionID, + }) + } + return result, nil +} diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go new file mode 100644 index 00000000..ec973e2c --- /dev/null +++ b/syncapi/storage/syncserver.go @@ -0,0 +1,695 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + "fmt" + + "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/roomserver/api" + // Import the postgres database driver. + _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.Event + membership string + // The stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + membershipPos types.StreamPosition +} + +// Same as gomatrixserverlib.Event but also has the stream position for this event. +type streamEvent struct { + gomatrixserverlib.Event + streamPosition types.StreamPosition + transactionID *api.TransactionID +} + +// SyncServerDatabase represents a sync server database +type SyncServerDatabase struct { + db *sql.DB + common.PartitionOffsetStatements + accountData accountDataStatements + events outputRoomEventsStatements + roomstate currentRoomStateStatements + invites inviteEventsStatements +} + +// NewSyncServerDatabase creates a new sync server database +func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { + var d SyncServerDatabase + var err error + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { + return nil, err + } + if err = d.accountData.prepare(d.db); err != nil { + return nil, err + } + if err = d.events.prepare(d.db); err != nil { + return nil, err + } + if err := d.roomstate.prepare(d.db); err != nil { + return nil, err + } + if err := d.invites.prepare(d.db); err != nil { + return nil, err + } + return &d, nil +} + +// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. +func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { + return d.roomstate.selectJoinedUsers(ctx) +} + +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database. +// Does not include any transaction IDs in the returned events. +func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { + streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) + if err != nil { + return nil, err + } + + // We don't include a device here as we only include transaction IDs in + // incremental syncs. + return streamEventsToEvents(nil, streamEvents), nil +} + +// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races +// when generating the stream position for this event. Returns the sync stream position for the inserted event. +// Returns an error if there was a problem inserting this event. +func (d *SyncServerDatabase) WriteEvent( + ctx context.Context, + ev *gomatrixserverlib.Event, + addStateEvents []gomatrixserverlib.Event, + addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, +) (streamPos types.StreamPosition, returnErr error) { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { + var err error + pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) + if err != nil { + return err + } + streamPos = types.StreamPosition(pos) + + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { + // Nothing to do, the event may have just been a message event. + return nil + } + + return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, streamPos) + }) + return +} + +func (d *SyncServerDatabase) updateRoomState( + ctx context.Context, txn *sql.Tx, + removedEventIDs []string, + addedEvents []gomatrixserverlib.Event, + streamPos types.StreamPosition, +) error { + // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. + for _, eventID := range removedEventIDs { + if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil { + return err + } + } + + for _, event := range addedEvents { + if event.StateKey() == nil { + // ignore non state events + continue + } + var membership *string + if event.Type() == "m.room.member" { + value, err := event.Membership() + if err != nil { + return err + } + membership = &value + } + if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, int64(streamPos)); err != nil { + return err + } + } + + return nil +} + +// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key +// If no event could be found, returns nil +// If there was an issue during the retrieval, returns an error +func (d *SyncServerDatabase) GetStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.Event, error) { + return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) +} + +// GetStateEventsForRoom fetches the state events for a given room. +// Returns an empty slice if no state events could be found for this room. +// Returns an error if there was an issue with the retrieval. +func (d *SyncServerDatabase) GetStateEventsForRoom( + ctx context.Context, roomID string, +) (stateEvents []gomatrixserverlib.Event, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + return err + }) + return +} + +// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. +func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { + return d.syncStreamPositionTx(ctx, nil) +} + +func (d *SyncServerDatabase) syncStreamPositionTx( + ctx context.Context, txn *sql.Tx, +) (types.StreamPosition, error) { + maxID, err := d.events.selectMaxEventID(ctx, txn) + if err != nil { + return 0, err + } + maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + if err != nil { + return 0, err + } + if maxAccountDataID > maxID { + maxID = maxAccountDataID + } + maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) + if err != nil { + return 0, err + } + if maxInviteID > maxID { + maxID = maxInviteID + } + return types.StreamPosition(maxID), nil +} + +// IncrementalSync returns all the data needed in order to create an incremental +// sync response for the given user. Events returned will include any client +// transaction IDs associated with the given device. These transaction IDs come +// from when the device sent the event via an API that included a transaction +// ID. +func (d *SyncServerDatabase) IncrementalSync( + ctx context.Context, + device authtypes.Device, + fromPos, toPos types.StreamPosition, + numRecentEventsPerRoom int, +) (*types.Response, error) { + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + if err != nil { + return nil, err + } + + res := types.NewResponse(toPos) + for _, delta := range deltas { + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + if err != nil { + return nil, err + } + } + + // TODO: This should be done in getStateDeltas + if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + return nil, err + } + + succeeded = true + return res, nil +} + +// CompleteSync a complete /sync API response for the given user. +func (d *SyncServerDatabase) CompleteSync( + ctx context.Context, userID string, numRecentEventsPerRoom int, +) (*types.Response, error) { + // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have + // a consistent view of the database throughout. This includes extracting the sync stream position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + // Get the current stream position which we will base the sync response on. + pos, err := d.syncStreamPositionTx(ctx, txn) + if err != nil { + return nil, err + } + + // Extract room state and recent events for all rooms the user is joined to. + roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, err + } + + // Build up a /sync response. Add joined rooms. + res := types.NewResponse(pos) + for _, roomID := range roomIDs { + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []streamEvent + recentStreamEvents, err = d.events.selectRecentEvents( + ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, + ) + if err != nil { + return nil, err + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs + recentEvents := streamEventsToEvents(nil, recentStreamEvents) + + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr := types.NewJoinResponse() + if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() + } else { + jr.Timeline.PrevBatch = types.StreamPosition(1).String() + } + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr + } + + if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { + return nil, err + } + + succeeded = true + return res, err +} + +var txReadOnlySnapshot = sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, +} + +// GetAccountDataInRange returns all account data for a given user inserted or +// updated between two given positions +// Returns a map following the format data[roomID] = []dataTypes +// If no data is retrieved, returns an empty map +// If there was an issue with the retrieval, returns an error +func (d *SyncServerDatabase) GetAccountDataInRange( + ctx context.Context, userID string, oldPos, newPos types.StreamPosition, +) (map[string][]string, error) { + return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos) +} + +// UpsertAccountData keeps track of new or updated account data, by saving the type +// of the new/updated data, and the user ID and room ID the data is related to (empty) +// room ID means the data isn't specific to any room) +// If no data with the given type, user ID and room ID exists in the database, +// creates a new row, else update the existing one +// Returns an error if there was an issue with the upsert +func (d *SyncServerDatabase) UpsertAccountData( + ctx context.Context, userID, roomID, dataType string, +) (types.StreamPosition, error) { + pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) + return types.StreamPosition(pos), err +} + +// AddInviteEvent stores a new invite event for a user. +// If the invite was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) AddInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (types.StreamPosition, error) { + pos, err := d.invites.insertInviteEvent(ctx, inviteEvent) + return types.StreamPosition(pos), err +} + +// RetireInviteEvent removes an old invite event from the database. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) RetireInviteEvent( + ctx context.Context, inviteEventID string, +) error { + // TODO: Record that invite has been retired in a stream so that we can + // notify the user in an incremental sync. + err := d.invites.deleteInviteEvent(ctx, inviteEventID) + return err +} + +func (d *SyncServerDatabase) addInvitesToResponse( + ctx context.Context, txn *sql.Tx, + userID string, + fromPos, toPos types.StreamPosition, + res *types.Response, +) error { + invites, err := d.invites.selectInviteEventsInRange( + ctx, txn, userID, int64(fromPos), int64(toPos), + ) + if err != nil { + return err + } + for roomID, inviteEvent := range invites { + ir := types.NewInviteResponse() + ir.InviteState.Events = gomatrixserverlib.ToClientEvents( + []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, + ) + // TODO: add the invite state from the invite event. + res.Rooms.Invite[roomID] = *ir + } + return nil +} + +// addRoomDeltaToResponse adds a room state delta to a sync response +func (d *SyncServerDatabase) addRoomDeltaToResponse( + ctx context.Context, + device *authtypes.Device, + txn *sql.Tx, + fromPos, toPos types.StreamPosition, + delta stateDelta, + numRecentEventsPerRoom int, + res *types.Response, +) error { + endPos := toPos + if delta.membershipPos > 0 && delta.membership == "leave" { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + recentStreamEvents, err := d.events.selectRecentEvents( + ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + ) + if err != nil { + return err + } + recentEvents := streamEventsToEvents(device, recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + + // Don't bother appending empty room entries + if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { + return nil + } + + switch delta.membership { + case "join": + jr := types.NewJoinResponse() + if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() + } else { + jr.Timeline.PrevBatch = types.StreamPosition(1).String() + } + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case "leave": + fallthrough // transitions to leave are the same as ban + case "ban": + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() + } else { + lr.Timeline.PrevBatch = types.StreamPosition(1).String() + } + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } + + return nil +} + +// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. +// Returns a map of room ID to list of events. +func (d *SyncServerDatabase) fetchStateEvents( + ctx context.Context, txn *sql.Tx, + roomIDToEventIDSet map[string]map[string]bool, + eventIDToEvent map[string]streamEvent, +) (map[string][]streamEvent, error) { + stateBetween := make(map[string][]streamEvent) + missingEvents := make(map[string][]string) + for roomID, ids := range roomIDToEventIDSet { + events := stateBetween[roomID] + for id, need := range ids { + if !need { + continue // deleted state + } + e, ok := eventIDToEvent[id] + if ok { + events = append(events, e) + } else { + m := missingEvents[roomID] + m = append(m, id) + missingEvents[roomID] = m + } + } + stateBetween[roomID] = events + } + + if len(missingEvents) > 0 { + // This happens when add_state_ids has an event ID which is not in the provided range. + // We need to explicitly fetch them. + allMissingEventIDs := []string{} + for _, missingEvIDs := range missingEvents { + allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) + } + evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) + if err != nil { + return nil, err + } + // we know we got them all otherwise an error would've been returned, so just loop the events + for _, ev := range evs { + roomID := ev.RoomID() + stateBetween[roomID] = append(stateBetween[roomID], ev) + } + } + return stateBetween, nil +} + +func (d *SyncServerDatabase) fetchMissingStateEvents( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]streamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.events.selectEvents(ctx, txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + events = append(events, stateEvents...) + return events, nil +} + +func (d *SyncServerDatabase) getStateDeltas( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos types.StreamPosition, userID string, +) ([]stateDelta, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + var deltas []stateDelta + + // get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + if err != nil { + return nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership == "join" { + // send full room state down instead of a delta + var allState []gomatrixserverlib.Event + allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} + } + state[roomID] = s + continue // we'll add this room in when we do joined rooms + } + + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + break + } + } + } + + // Add in currently joined rooms + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(device, state[joinedRoomID]), + roomID: joinedRoomID, + }) + } + + return deltas, nil +} + +// streamEventsToEvents converts streamEvent to Event. If device is non-nil and +// matches the streamevent.transactionID device then the transaction ID gets +// added to the unsigned section of the output event. +func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { + out := make([]gomatrixserverlib.Event, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[i].Event + if device != nil && in[i].transactionID != nil { + if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID { + err := out[i].SetUnsignedField( + "transaction_id", in[i].transactionID.TransactionID, + ) + if err != nil { + logrus.WithFields(logrus.Fields{ + "event_id": out[i].EventID(), + }).WithError(err).Warnf("Failed to add transaction ID to event") + } + } + } + } + return out +} + +// There may be some overlap where events in stateEvents are already in recentEvents, so filter +// them out so we don't include them twice in the /sync response. They should be in recentEvents +// only, so clients get to the correct state once they have rolled forward. +func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} + +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + membership, err := ev.Membership() + if err != nil { + return "" + } + return membership + } + return "" +} diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go new file mode 100644 index 00000000..5ed701d8 --- /dev/null +++ b/syncapi/sync/notifier.go @@ -0,0 +1,243 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "sync" + "time" + + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +// Notifier will wake up sleeping requests when there is some new data. +// It does not tell requests what that data is, only the stream position which +// they can use to get at it. This is done to prevent races whereby we tell the caller +// the event, but the token has already advanced by the time they fetch it, resulting +// in missed events. +type Notifier struct { + // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine + roomIDToJoinedUsers map[string]userIDSet + // Protects currPos and userStreams. + streamLock *sync.Mutex + // The latest sync stream position + currPos types.StreamPosition + // A map of user_id => UserStream which can be used to wake a given user's /sync request. + userStreams map[string]*UserStream + // The last time we cleaned out stale entries from the userStreams map + lastCleanUpTime time.Time +} + +// NewNotifier creates a new notifier set to the given stream position. +// In order for this to be of any use, the Notifier needs to be told all rooms and +// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). +func NewNotifier(pos types.StreamPosition) *Notifier { + return &Notifier{ + currPos: pos, + roomIDToJoinedUsers: make(map[string]userIDSet), + userStreams: make(map[string]*UserStream), + streamLock: &sync.Mutex{}, + lastCleanUpTime: time.Now(), + } +} + +// OnNewEvent is called when a new event is received from the room server. Must only be +// called from a single goroutine, to avoid races between updates which could set the +// current position in the stream incorrectly. +// Can be called either with a *gomatrixserverlib.Event, or with an user ID +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) { + // update the current position then notify relevant /sync streams. + // This needs to be done PRIOR to waking up users as they will read this value. + n.streamLock.Lock() + defer n.streamLock.Unlock() + n.currPos = pos + + n.removeEmptyUserStreams() + + if ev != nil { + // Map this event's room_id to a list of joined users, and wake them up. + userIDs := n.joinedUsers(ev.RoomID()) + // If this is an invite, also add in the invitee to this list. + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + targetUserID := *ev.StateKey() + membership, err := ev.Membership() + if err != nil { + log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + "Notifier.OnNewEvent: Failed to unmarshal member event", + ) + } else { + // Keep the joined user map up-to-date + switch membership { + case "invite": + userIDs = append(userIDs, targetUserID) + case "join": + // Manually append the new user's ID so they get notified + // along all members in the room + userIDs = append(userIDs, targetUserID) + n.addJoinedUser(ev.RoomID(), targetUserID) + case "leave": + fallthrough + case "ban": + n.removeJoinedUser(ev.RoomID(), targetUserID) + } + } + } + + for _, toNotifyUserID := range userIDs { + n.wakeupUser(toNotifyUserID, pos) + } + } else if len(userID) > 0 { + n.wakeupUser(userID, pos) + } +} + +// GetListener returns a UserStreamListener that can be used to wait for +// updates for a user. Must be closed. +// notify for anything before sincePos +func (n *Notifier) GetListener(req syncRequest) UserStreamListener { + // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 + // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID + // - Incoming events wake requests for a matching room ID + // - Incoming events wake requests for a matching user ID (needed for invites) + + // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, + // but given we don't do /events, let's pretend it doesn't exist. + + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.removeEmptyUserStreams() + + return n.fetchUserStream(req.device.UserID, true).GetListener(req.ctx) +} + +// Load the membership states required to notify users correctly. +func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) error { + roomToUsers, err := db.AllJoinedUsersInRooms(ctx) + if err != nil { + return err + } + n.setUsersJoinedToRooms(roomToUsers) + return nil +} + +// CurrentPosition returns the current stream position +func (n *Notifier) CurrentPosition() types.StreamPosition { + return n.currPos +} + +// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from +// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to +// OnNewEvent (eg on startup) to prevent racing. +func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { + // This is just the bulk form of addJoinedUser + for roomID, userIDs := range roomIDToUserIDs { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + for _, userID := range userIDs { + n.roomIDToJoinedUsers[roomID].add(userID) + } + } +} + +func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { + stream := n.fetchUserStream(userID, false) + if stream == nil { + return + } + stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream +} + +// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, +// a stream will be made for this user if one doesn't exist and it will be returned. This +// function does not wait for data to be available on the stream. +func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { + stream, ok := n.userStreams[userID] + if !ok && makeIfNotExists { + // TODO: Unbounded growth of streams (1 per user) + stream = NewUserStream(userID, n.currPos) + n.userStreams[userID] = stream + } + return stream +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) addJoinedUser(roomID, userID string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + n.roomIDToJoinedUsers[roomID].add(userID) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) removeJoinedUser(roomID, userID string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + n.roomIDToJoinedUsers[roomID].remove(userID) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + return + } + return n.roomIDToJoinedUsers[roomID].values() +} + +// removeEmptyUserStreams iterates through the user stream map and removes any +// that have been empty for a certain amount of time. This is a crude way of +// ensuring that the userStreams map doesn't grow forver. +// This should be called when the notifier gets called for whatever reason, +// the function itself is responsible for ensuring it doesn't iterate too +// often. +// NB: Callers should have locked the mutex before calling this function. +func (n *Notifier) removeEmptyUserStreams() { + // Only clean up now and again + now := time.Now() + if n.lastCleanUpTime.Add(time.Minute).After(now) { + return + } + n.lastCleanUpTime = now + + deleteBefore := now.Add(-5 * time.Minute) + for key, value := range n.userStreams { + if value.TimeOfLastNonEmpty().Before(deleteBefore) { + delete(n.userStreams, key) + } + } +} + +// A string set, mainly existing for improving clarity of structs in this file. +type userIDSet map[string]bool + +func (s userIDSet) add(str string) { + s[str] = true +} + +func (s userIDSet) remove(str string) { + delete(s, str) +} + +func (s userIDSet) values() (vals []string) { + for str := range s { + vals = append(vals, str) + } + return +} diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go new file mode 100644 index 00000000..4fa54393 --- /dev/null +++ b/syncapi/sync/notifier_test.go @@ -0,0 +1,293 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +var ( + randomMessageEvent gomatrixserverlib.Event + aliceInviteBobEvent gomatrixserverlib.Event + bobLeaveEvent gomatrixserverlib.Event +) + +var ( + streamPositionVeryOld = types.StreamPosition(5) + streamPositionBefore = types.StreamPosition(11) + streamPositionAfter = types.StreamPosition(12) + streamPositionAfter2 = types.StreamPosition(13) + roomID = "!test:localhost" + alice = "@alice:localhost" + bob = "@bob:localhost" +) + +func init() { + var err error + randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.message", + "content": { + "body": "Hello World", + "msgtype": "m.text" + }, + "sender": "@noone:localhost", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$randomMessageEvent:localhost" + }`), false) + if err != nil { + panic(err) + } + aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "invite" + }, + "sender": "`+alice+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$aliceInviteBobEvent:localhost" + }`), false) + if err != nil { + panic(err) + } + bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "leave" + }, + "sender": "`+bob+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$bobLeaveEvent:localhost" + }`), false) + if err != nil { + panic(err) + } +} + +// Test that the current position is returned if a request is already behind. +func TestImmediateNotification(t *testing.T) { + n := NewNotifier(streamPositionBefore) + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld)) + if err != nil { + t.Fatalf("TestImmediateNotification error: %s", err) + } + if pos != streamPositionBefore { + t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos) + } +} + +// Test that new events to a joined room unblocks the request. +func TestNewEventAndJoinedToRoom(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: {alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + }() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 1) + + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) + + wg.Wait() +} + +// Test that an invite unblocks the request +func TestNewInviteEventForUser(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: {alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewInviteEventForUser error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + }() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 1) + + n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter) + + wg.Wait() +} + +// Test that all blocked requests get woken up on a new event. +func TestMultipleRequestWakeup(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: {alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(3) + poll := func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestMultipleRequestWakeup error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + } + go poll() + go poll() + go poll() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 3) + + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) + + wg.Wait() + + numWaiting := stream.NumWaiting() + if numWaiting != 0 { + t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting) + } +} + +// Test that you stop getting woken up when you leave a room. +func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { + // listen as bob. Make bob leave room. Make alice send event to room. + // Make sure alice gets woken up only and not bob as well. + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: {alice, bob}, + }) + + var leaveWG sync.WaitGroup + + // Make bob leave the room + leaveWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos) + } + leaveWG.Done() + }() + bobStream := n.fetchUserStream(bob, true) + waitForBlocking(bobStream, 1) + n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter) + leaveWG.Wait() + + // send an event into the room. Make sure alice gets it. Bob should not. + var aliceWG sync.WaitGroup + aliceStream := n.fetchUserStream(alice, true) + aliceWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter2 { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos) + } + aliceWG.Done() + }() + + go func() { + // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly) + _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter)) + if err == nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil") + } + }() + + waitForBlocking(aliceStream, 1) + waitForBlocking(bobStream, 1) + + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2) + aliceWG.Wait() + + // it's possible that at this point alice has been informed and bob is about to be informed, so wait + // for a fraction of a second to account for this race + time.Sleep(1 * time.Millisecond) +} + +// same as Notifier.WaitForEvents but with a timeout. +func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { + listener := n.GetListener(req) + defer listener.Close() + + select { + case <-time.After(5 * time.Second): + return types.StreamPosition(0), fmt.Errorf( + "waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since, + ) + case <-listener.GetNotifyChannel(*req.since): + p := listener.GetStreamPosition() + return p, nil + } +} + +// Wait until something is Wait()ing on the user stream. +func waitForBlocking(s *UserStream, numBlocking uint) { + for numBlocking != s.NumWaiting() { + // This is horrible but I don't want to add a signalling mechanism JUST for testing. + time.Sleep(1 * time.Microsecond) + } +} + +func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { + return syncRequest{ + device: authtypes.Device{UserID: userID}, + timeout: 1 * time.Minute, + since: &since, + wantFullState: false, + limit: defaultTimelineLimit, + log: util.GetLogger(context.TODO()), + ctx: context.TODO(), + } +} diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go new file mode 100644 index 00000000..35a15f6f --- /dev/null +++ b/syncapi/sync/request.go @@ -0,0 +1,87 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "net/http" + "strconv" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +const defaultSyncTimeout = time.Duration(0) +const defaultTimelineLimit = 20 + +// syncRequest represents a /sync request, with sensible defaults/sanity checks applied. +type syncRequest struct { + ctx context.Context + device authtypes.Device + limit int + timeout time.Duration + since *types.StreamPosition // nil means that no since token was supplied + wantFullState bool + log *log.Entry +} + +func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { + timeout := getTimeout(req.URL.Query().Get("timeout")) + fullState := req.URL.Query().Get("full_state") + wantFullState := fullState != "" && fullState != "false" + since, err := getSyncStreamPosition(req.URL.Query().Get("since")) + if err != nil { + return nil, err + } + // TODO: Additional query params: set_presence, filter + return &syncRequest{ + ctx: req.Context(), + device: device, + timeout: timeout, + since: since, + wantFullState: wantFullState, + limit: defaultTimelineLimit, // TODO: read from filter + log: util.GetLogger(req.Context()), + }, nil +} + +func getTimeout(timeoutMS string) time.Duration { + if timeoutMS == "" { + return defaultSyncTimeout + } + i, err := strconv.Atoi(timeoutMS) + if err != nil { + return defaultSyncTimeout + } + return time.Duration(i) * time.Millisecond +} + +// getSyncStreamPosition tries to parse a 'since' token taken from the API to a +// stream position. If the string is empty then (nil, nil) is returned. +func getSyncStreamPosition(since string) (*types.StreamPosition, error) { + if since == "" { + return nil, nil + } + i, err := strconv.Atoi(since) + if err != nil { + return nil, err + } + token := types.StreamPosition(i) + return &token, nil +} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go new file mode 100644 index 00000000..89137eb5 --- /dev/null +++ b/syncapi/sync/requestpool.go @@ -0,0 +1,216 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "net/http" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +// RequestPool manages HTTP long-poll connections for /sync +type RequestPool struct { + db *storage.SyncServerDatabase + accountDB *accounts.Database + notifier *Notifier +} + +// NewRequestPool makes a new RequestPool +func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool { + return &RequestPool{db, adb, n} +} + +// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be +// called in a dedicated goroutine for this request. This function will block the goroutine +// until a response is ready, or it times out. +func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse { + var syncData *types.Response + + // Extract values from request + logger := util.GetLogger(req.Context()) + userID := device.UserID + syncReq, err := newSyncRequest(req, *device) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown(err.Error()), + } + } + logger.WithFields(log.Fields{ + "userID": userID, + "since": syncReq.since, + "timeout": syncReq.timeout, + }).Info("Incoming /sync request") + + currPos := rp.notifier.CurrentPosition() + + // If this is an initial sync or timeout=0 we return immediately + if syncReq.since == nil || syncReq.timeout == 0 { + syncData, err = rp.currentSyncForUser(*syncReq, currPos) + if err != nil { + return httputil.LogThenError(req, err) + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncData, + } + } + + // Otherwise, we wait for the notifier to tell us if something *may* have + // happened. We loop in case it turns out that nothing did happen. + + timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above + defer timer.Stop() + + userStreamListener := rp.notifier.GetListener(*syncReq) + defer userStreamListener.Close() + + // We need the loop in case userStreamListener wakes up even if there isn't + // anything to send down. In this case, we'll jump out of the select but + // don't want to send anything back until we get some actual content to + // respond with, so we skip the return an go back to waiting for content to + // be sent down or the request timing out. + var hasTimedOut bool + for { + select { + // Wait for notifier to wake us up + case <-userStreamListener.GetNotifyChannel(currPos): + currPos = userStreamListener.GetStreamPosition() + // Or for timeout to expire + case <-timer.C: + // We just need to ensure we get out of the select after reaching the + // timeout, but there's nothing specific we want to do in this case + // apart from that, so we do nothing except stating we're timing out + // and need to respond. + hasTimedOut = true + // Or for the request to be cancelled + case <-req.Context().Done(): + return httputil.LogThenError(req, req.Context().Err()) + } + + // Note that we don't time out during calculation of sync + // response. This ensures that we don't waste the hard work + // of calculating the sync only to get timed out before we + // can respond + + syncData, err = rp.currentSyncForUser(*syncReq, currPos) + if err != nil { + return httputil.LogThenError(req, err) + } + + if !syncData.IsEmpty() || hasTimedOut { + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncData, + } + } + } +} + +func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { + // TODO: handle ignored users + if req.since == nil { + res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) + } else { + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) + } + + if err != nil { + return + } + + res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos) + return +} + +func (rp *RequestPool) appendAccountData( + data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, +) (*types.Response, error) { + // TODO: Account data doesn't have a sync position of its own, meaning that + // account data might be sent multiple time to the client if multiple account + // data keys were set between two message. This isn't a huge issue since the + // duplicate data doesn't represent a huge quantity of data, but an optimisation + // here would be making sure each data is sent only once to the client. + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return nil, err + } + + if req.since == nil { + // If this is the initial sync, we don't need to check if a data has + // already been sent. Instead, we send the whole batch. + var global []gomatrixserverlib.ClientEvent + var rooms map[string][]gomatrixserverlib.ClientEvent + global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart) + if err != nil { + return nil, err + } + data.AccountData.Events = global + + for r, j := range data.Rooms.Join { + if len(rooms[r]) > 0 { + j.AccountData.Events = rooms[r] + data.Rooms.Join[r] = j + } + } + + return data, nil + } + + // Sync is not initial, get all account data since the latest sync + dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) + if err != nil { + return nil, err + } + + if len(dataTypes) == 0 { + return data, nil + } + + // Iterate over the rooms + for roomID, dataTypes := range dataTypes { + events := []gomatrixserverlib.ClientEvent{} + // Request the missing data from the database + for _, dataType := range dataTypes { + evs, err := rp.accountDB.GetAccountDataByType( + req.ctx, localpart, roomID, dataType, + ) + if err != nil { + return nil, err + } + events = append(events, evs...) + } + + // Append the data to the response + if len(roomID) > 0 { + jr := data.Rooms.Join[roomID] + jr.AccountData.Events = events + data.Rooms.Join[roomID] = jr + } else { + data.AccountData.Events = events + } + } + + return data, nil +} diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go new file mode 100644 index 00000000..77d09c20 --- /dev/null +++ b/syncapi/sync/userstream.go @@ -0,0 +1,162 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" +) + +// UserStream represents a communication mechanism between the /sync request goroutine +// and the underlying sync server goroutines. +// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() +// updates. +type UserStream struct { + UserID string + // The lock that protects changes to this struct + lock sync.Mutex + // Closed when there is an update. + signalChannel chan struct{} + // The last stream position that there may have been an update for the suser + pos types.StreamPosition + // The last time when we had some listeners waiting + timeOfLastChannel time.Time + // The number of listeners waiting + numWaiting uint +} + +// UserStreamListener allows a sync request to wait for updates for a user. +type UserStreamListener struct { + userStream *UserStream + + // Whether the stream has been closed + hasClosed bool +} + +// NewUserStream creates a new user stream +func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { + return &UserStream{ + UserID: userID, + timeOfLastChannel: time.Now(), + pos: currPos, + signalChannel: make(chan struct{}), + } +} + +// GetListener returns UserStreamListener that a sync request can use to wait +// for new updates with. +// UserStreamListener must be closed +func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { + s.lock.Lock() + defer s.lock.Unlock() + + s.numWaiting++ // We decrement when UserStreamListener is closed + + listener := UserStreamListener{ + userStream: s, + } + + // Lets be a bit paranoid here and check that Close() is being called + runtime.SetFinalizer(&listener, func(l *UserStreamListener) { + if !l.hasClosed { + util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener") + l.Close() + } + }) + + return listener +} + +// Broadcast a new stream position for this user. +func (s *UserStream) Broadcast(pos types.StreamPosition) { + s.lock.Lock() + defer s.lock.Unlock() + + s.pos = pos + + close(s.signalChannel) + + s.signalChannel = make(chan struct{}) +} + +// NumWaiting returns the number of goroutines waiting for waiting for updates. +// Used for metrics and testing. +func (s *UserStream) NumWaiting() uint { + s.lock.Lock() + defer s.lock.Unlock() + return s.numWaiting +} + +// TimeOfLastNonEmpty returns the last time that the number of waiting listeners +// was non-empty, may be time.Now() if number of waiting listeners is currently +// non-empty. +func (s *UserStream) TimeOfLastNonEmpty() time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + if s.numWaiting > 0 { + return time.Now() + } + + return s.timeOfLastChannel +} + +// GetStreamPosition returns last stream position which the UserStream was +// notified about +func (s *UserStreamListener) GetStreamPosition() types.StreamPosition { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + return s.userStream.pos +} + +// GetNotifyChannel returns a channel that is closed when there may be an +// update for the user. +// sincePos specifies from which point we want to be notified about. If there +// has already been an update after sincePos we'll return a closed channel +// immediately. +func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if sincePos < s.userStream.pos { + // If the listener is behind, i.e. missed a potential update, then we + // want them to wake up immediately. We do this by returning a new + // closed stream, which returns immediately when selected. + closedChannel := make(chan struct{}) + close(closedChannel) + return closedChannel + } + + return s.userStream.signalChannel +} + +// Close cleans up resources used +func (s *UserStreamListener) Close() { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if !s.hasClosed { + s.userStream.numWaiting-- + s.userStream.timeOfLastChannel = time.Now() + } + + s.hasClosed = true +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go new file mode 100644 index 00000000..2db54c3c --- /dev/null +++ b/syncapi/syncapi.go @@ -0,0 +1,75 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncapi + +import ( + "context" + + "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/syncapi/consumers" + "github.com/matrix-org/dendrite/syncapi/routing" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" +) + +// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI +// component. +func SetupSyncAPIComponent( + base *basecomponent.BaseDendrite, + deviceDB *devices.Database, + accountsDB *accounts.Database, + queryAPI api.RoomserverQueryAPI, +) { + syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to sync db") + } + + pos, err := syncDB.SyncStreamPosition(context.Background()) + if err != nil { + logrus.WithError(err).Panicf("failed to get stream position") + } + + notifier := sync.NewNotifier(types.StreamPosition(pos)) + err = notifier.Load(context.Background(), syncDB) + if err != nil { + logrus.WithError(err).Panicf("failed to start notifier") + } + + requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB) + + roomConsumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, notifier, syncDB, queryAPI, + ) + if err = roomConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start room server consumer") + } + + clientConsumer := consumers.NewOutputClientDataConsumer( + base.Cfg, base.KafkaConsumer, notifier, syncDB, + ) + if err = clientConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start client data consumer") + } + + routing.Setup(base.APIMux, requestPool, syncDB, deviceDB) +} diff --git a/syncapi/types/types.go b/syncapi/types/types.go new file mode 100644 index 00000000..d0b1c38a --- /dev/null +++ b/syncapi/types/types.go @@ -0,0 +1,147 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "encoding/json" + "strconv" + + "github.com/matrix-org/gomatrixserverlib" +) + +// StreamPosition represents the offset in the sync stream a client is at. +type StreamPosition int64 + +// String implements the Stringer interface. +func (sp StreamPosition) String() string { + return strconv.FormatInt(int64(sp), 10) +} + +// PrevEventRef represents a reference to a previous event in a state event upgrade +type PrevEventRef struct { + PrevContent json.RawMessage `json:"prev_content"` + ReplacesState string `json:"replaces_state"` + PrevSender string `json:"prev_sender"` +} + +// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync +type Response struct { + NextBatch string `json:"next_batch"` + AccountData struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"account_data"` + Presence struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"presence"` + Rooms struct { + Join map[string]JoinResponse `json:"join"` + Invite map[string]InviteResponse `json:"invite"` + Leave map[string]LeaveResponse `json:"leave"` + } `json:"rooms"` +} + +// NewResponse creates an empty response with initialised maps. +func NewResponse(pos StreamPosition) *Response { + res := Response{} + // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this + // as an integer even though (at the moment) it is. + res.NextBatch = pos.String() + // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, + // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. + res.Rooms.Join = make(map[string]JoinResponse) + res.Rooms.Invite = make(map[string]InviteResponse) + res.Rooms.Leave = make(map[string]LeaveResponse) + + // Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value. + // TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should + // really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck. + // This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse. + res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.Presence.Events = make([]gomatrixserverlib.ClientEvent, 0) + + return &res +} + +// IsEmpty returns true if the response is empty, i.e. used to decided whether +// to return the response immediately to the client or to wait for more data. +func (r *Response) IsEmpty() bool { + return len(r.Rooms.Join) == 0 && + len(r.Rooms.Invite) == 0 && + len(r.Rooms.Leave) == 0 && + len(r.AccountData.Events) == 0 && + len(r.Presence.Events) == 0 +} + +// JoinResponse represents a /sync response for a room which is under the 'join' key. +type JoinResponse struct { + State struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"state"` + Timeline struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + Limited bool `json:"limited"` + PrevBatch string `json:"prev_batch"` + } `json:"timeline"` + Ephemeral struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"ephemeral"` + AccountData struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"account_data"` +} + +// NewJoinResponse creates an empty response with initialised arrays. +func NewJoinResponse() *JoinResponse { + res := JoinResponse{} + res.State.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.Ephemeral.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0) + return &res +} + +// InviteResponse represents a /sync response for a room which is under the 'invite' key. +type InviteResponse struct { + InviteState struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"invite_state"` +} + +// NewInviteResponse creates an empty response with initialised arrays. +func NewInviteResponse() *InviteResponse { + res := InviteResponse{} + res.InviteState.Events = make([]gomatrixserverlib.ClientEvent, 0) + return &res +} + +// LeaveResponse represents a /sync response for a room which is under the 'leave' key. +type LeaveResponse struct { + State struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + } `json:"state"` + Timeline struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + Limited bool `json:"limited"` + PrevBatch string `json:"prev_batch"` + } `json:"timeline"` +} + +// NewLeaveResponse creates an empty response with initialised arrays. +func NewLeaveResponse() *LeaveResponse { + res := LeaveResponse{} + res.State.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0) + return &res +} |