aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorruben <code@rbn.im>2019-05-21 22:56:55 +0200
committerBrendan Abolivier <babolivier@matrix.org>2019-05-21 21:56:55 +0100
commit74827428bd3e11faab65f12204449c1b9469b0ae (patch)
tree0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /syncapi
parent4d588f7008afe5600219ac0930c2eee2de5c447b (diff)
use go module for dependencies (#594)
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/README.md86
-rw-r--r--syncapi/consumers/clientapi.go95
-rw-r--r--syncapi/consumers/roomserver.go286
-rw-r--r--syncapi/routing/routing.go61
-rw-r--r--syncapi/routing/state.go118
-rw-r--r--syncapi/storage/account_data_table.go141
-rw-r--r--syncapi/storage/current_room_state_table.go249
-rw-r--r--syncapi/storage/invites_table.go133
-rw-r--r--syncapi/storage/output_room_events_table.go294
-rw-r--r--syncapi/storage/syncserver.go695
-rw-r--r--syncapi/sync/notifier.go243
-rw-r--r--syncapi/sync/notifier_test.go293
-rw-r--r--syncapi/sync/request.go87
-rw-r--r--syncapi/sync/requestpool.go216
-rw-r--r--syncapi/sync/userstream.go162
-rw-r--r--syncapi/syncapi.go75
-rw-r--r--syncapi/types/types.go147
17 files changed, 3381 insertions, 0 deletions
diff --git a/syncapi/README.md b/syncapi/README.md
new file mode 100644
index 00000000..7221b22d
--- /dev/null
+++ b/syncapi/README.md
@@ -0,0 +1,86 @@
+# Sync API Server
+
+This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. Currently, the sync server will:
+ - Return a valid `/sync` response for the user represented by the provided `access_token`.
+ - Return a "complete sync" if no `since` value is provided, and return a valid `next_batch` token. This contains all rooms the user has been invited to or has joined. For joined rooms, this includes the complete current room state and the most recent 20 (hard-coded) events in the timeline.
+ - For "incremental syncs" (a `since` value is provided), as you get invited to, join, or leave rooms they will be reflected correctly in the `/sync` response.
+ - For very large state deltas, the `state` section of a room is correctly populated with the state of the room at the *start* of the timeline.
+ - When you join a room, the `/sync` which transitions your client to be "joined" will include the complete current room state as per the specification.
+ - Only wake up user streams it needs to wake up.
+ - Honours the `timeout` query parameter value.
+
+## Internals
+
+When the server gets a `/sync` request, it needs to:
+ - Work out *which* rooms to return to the client.
+ - For each room, work out *which* events to return to the client.
+
+The logic for working out which rooms is based on [Synapse](https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821):
+ 1) Get the CURRENT joined room list for this user.
+ 2) Get membership list changes for this user between the provided stream position and now.
+ 3) For each room which has membership list changes:
+ - Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins).
+ If it is, then we need to send the full room state down (and 'limited' is always true).
+ - Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
+ - Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
+ 4) Add joined rooms (joined room list)
+
+For each room, the /sync response returns the most recent timeline events and the state of the room at the start of the timeline.
+The logic for working out *which* events is not based entirely on Synapse code, as it is known broken with respect to working out
+room state. In order to know which events to return, the server needs to calculate room state at various points in the history of
+the room. For example, imagine a room with the following 15 events (letters are state events (updated via `'`), numbers are timeline events):
+
+```
+index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (1-based indexing as StreamPosition(0) represents no event)
+timeline [A, B, C, D, 1, 2, 3, D', 4, D'', 5, B', D''', D'''', 6]
+```
+
+The current state of this room is: `[A, B', C, D'''']`.
+
+If this room was requested with `?since=14&limit=5` then 1 timeline event would be returned, the most recent one:
+```
+ 15
+ [ 6 ]
+```
+
+If this room was requested with `?since=9&limit=5` then 5 timeline events would be returned, the most recent ones:
+```
+ 11 12 13 14 15
+ [5, B', D''', D'''', 6]
+```
+
+The state of the room at the START of the timeline can be represented in 2 ways:
+ - The `full_state` from index 0 : `[A, B, C, D'']` (aka the state between 0-11 exclusive)
+ - A partial state from index 9 : `[D'']` (aka the state between 9-11 exclusive)
+
+Servers advance state events (e.g from `D'` to `D''`) based on the state conflict resolution algorithm.
+You might think that you could advance the current state by just updating the entry for the `(event type, state_key)` tuple
+for each state event, but this state can diverge from the state calculated using the state conflict resolution algorithm.
+For example, if there are two "simultaneous" updates to the same state key, that is two updates at the same depth in the
+event graph, then the final result of the state conflict resolution algorithm might not match the order the events appear
+in the timeline.
+
+The correct advancement for state events is represented by the `AddsStateEventIDs` and `RemovesStateEventIDs` that
+are in `OutputRoomEvents` from the room server.
+
+This version of the sync server uses very simple indexing to calculate room state at various points.
+This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes
+very large. This is mitigated slightly with indexes, but better data structures could be used in the future.
+
+## Known Issues
+
+- `m.room.history_visibility` is not honoured: it is always treated as "shared".
+- All ephemeral events are not implemented (presence, typing, receipts).
+- Account data (both user and room) is not implemented.
+- `to_device` messages are not implemented.
+- Back-pagination via `prev_batch` is not implemented.
+- The `limited` flag can lie.
+- Filters are not honoured or implemented. The `limit` for each room is hard-coded to 20.
+- The `full_state` query parameter is not implemented.
+- The `set_presence` query parameter is not implemented.
+- "Ignored" users are not ignored.
+- Redacted events are still sent to clients.
+- Invites over federation (if it existed) won't work as they aren't "real" events and so won't be in the right tables.
+- `invite_state` is not implemented (for similar reasons to the above point).
+- The current implementation scales badly when a very old `since` token is provided.
+- The entire current room state can be re-sent to the client if they send a duplicate "join" event which should be a no-op.
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
new file mode 100644
index 00000000..d05a7692
--- /dev/null
+++ b/syncapi/consumers/clientapi.go
@@ -0,0 +1,95 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ log "github.com/sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputClientDataConsumer consumes events that originated in the client API server.
+type OutputClientDataConsumer struct {
+ clientAPIConsumer *common.ContinualConsumer
+ db *storage.SyncServerDatabase
+ notifier *sync.Notifier
+}
+
+// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
+func NewOutputClientDataConsumer(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store *storage.SyncServerDatabase,
+) *OutputClientDataConsumer {
+
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputClientData),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputClientDataConsumer{
+ clientAPIConsumer: &consumer,
+ db: store,
+ notifier: n,
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from room servers
+func (s *OutputClientDataConsumer) Start() error {
+ return s.clientAPIConsumer.Start()
+}
+
+// onMessage is called when the sync server receives a new event from the client API server output log.
+// It is not safe for this function to be called from multiple goroutines, or else the
+// sync stream position may race and be incorrectly calculated.
+func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output common.AccountData
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("client API server output log: message parse failure")
+ return nil
+ }
+
+ log.WithFields(log.Fields{
+ "type": output.Type,
+ "room_id": output.RoomID,
+ }).Info("received data from client API server")
+
+ syncStreamPos, err := s.db.UpsertAccountData(
+ context.TODO(), string(msg.Key), output.RoomID, output.Type,
+ )
+ if err != nil {
+ log.WithFields(log.Fields{
+ "type": output.Type,
+ "room_id": output.RoomID,
+ log.ErrorKey: err,
+ }).Panicf("could not save account data")
+ }
+
+ s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
+
+ return nil
+}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
new file mode 100644
index 00000000..273b6aea
--- /dev/null
+++ b/syncapi/consumers/roomserver.go
@@ -0,0 +1,286 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputRoomEventConsumer consumes events that originated in the room server.
+type OutputRoomEventConsumer struct {
+ roomServerConsumer *common.ContinualConsumer
+ db *storage.SyncServerDatabase
+ notifier *sync.Notifier
+ query api.RoomserverQueryAPI
+}
+
+// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
+func NewOutputRoomEventConsumer(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store *storage.SyncServerDatabase,
+ queryAPI api.RoomserverQueryAPI,
+) *OutputRoomEventConsumer {
+
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputRoomEventConsumer{
+ roomServerConsumer: &consumer,
+ db: store,
+ notifier: n,
+ query: queryAPI,
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from room servers
+func (s *OutputRoomEventConsumer) Start() error {
+ return s.roomServerConsumer.Start()
+}
+
+// onMessage is called when the sync server receives a new event from the room server output log.
+// It is not safe for this function to be called from multiple goroutines, or else the
+// sync stream position may race and be incorrectly calculated.
+func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return nil
+ }
+
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
+ case api.OutputTypeNewInviteEvent:
+ return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
+ case api.OutputTypeRetireInviteEvent:
+ return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
+ default:
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
+ return nil
+ }
+}
+
+func (s *OutputRoomEventConsumer) onNewRoomEvent(
+ ctx context.Context, msg api.OutputNewRoomEvent,
+) error {
+ ev := msg.Event
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "room_id": ev.RoomID(),
+ }).Info("received event from roomserver")
+
+ addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ "add": msg.AddsStateEventIDs,
+ "del": msg.RemovesStateEventIDs,
+ }).Panicf("roomserver output log: state event lookup failure")
+ }
+
+ ev, err = s.updateStateEvent(ev)
+ if err != nil {
+ return err
+ }
+
+ for i := range addsStateEvents {
+ addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ syncStreamPos, err := s.db.WriteEvent(
+ ctx,
+ &ev,
+ addsStateEvents,
+ msg.AddsStateEventIDs,
+ msg.RemovesStateEventIDs,
+ msg.TransactionID,
+ )
+ if err != nil {
+ return err
+ }
+
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ "add": msg.AddsStateEventIDs,
+ "del": msg.RemovesStateEventIDs,
+ }).Panicf("roomserver output log: write event failure")
+ return nil
+ }
+ s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
+
+ return nil
+}
+
+func (s *OutputRoomEventConsumer) onNewInviteEvent(
+ ctx context.Context, msg api.OutputNewInviteEvent,
+) error {
+ syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(msg.Event.JSON()),
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write invite failure")
+ return nil
+ }
+ s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos)
+ return nil
+}
+
+func (s *OutputRoomEventConsumer) onRetireInviteEvent(
+ ctx context.Context, msg api.OutputRetireInviteEvent,
+) error {
+ err := s.db.RetireInviteEvent(ctx, msg.EventID)
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event_id": msg.EventID,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: remove invite failure")
+ return nil
+ }
+ // TODO: Notify any active sync requests that the invite has been retired.
+ // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos)
+ return nil
+}
+
+// lookupStateEvents looks up the state events that are added by a new event.
+func (s *OutputRoomEventConsumer) lookupStateEvents(
+ addsStateEventIDs []string, event gomatrixserverlib.Event,
+) ([]gomatrixserverlib.Event, error) {
+ // Fast path if there aren't any new state events.
+ if len(addsStateEventIDs) == 0 {
+ return nil, nil
+ }
+
+ // Fast path if the only state event added is the event itself.
+ if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
+ return []gomatrixserverlib.Event{event}, nil
+ }
+
+ // Check if this is re-adding a state events that we previously processed
+ // If we have previously received a state event it may still be in
+ // our event database.
+ result, err := s.db.Events(context.TODO(), addsStateEventIDs)
+ if err != nil {
+ return nil, err
+ }
+ missing := missingEventsFrom(result, addsStateEventIDs)
+
+ // Check if event itself is being added.
+ for _, eventID := range missing {
+ if eventID == event.EventID() {
+ result = append(result, event)
+ break
+ }
+ }
+ missing = missingEventsFrom(result, addsStateEventIDs)
+
+ if len(missing) == 0 {
+ return result, nil
+ }
+
+ // At this point the missing events are neither the event itself nor are
+ // they present in our local database. Our only option is to fetch them
+ // from the roomserver using the query API.
+ eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
+ var eventResp api.QueryEventsByIDResponse
+ if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ return nil, err
+ }
+
+ result = append(result, eventResp.Events...)
+ missing = missingEventsFrom(result, addsStateEventIDs)
+
+ if len(missing) != 0 {
+ return nil, fmt.Errorf(
+ "missing %d state events IDs at event %q", len(missing), event.EventID(),
+ )
+ }
+
+ return result, nil
+}
+
+func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
+ var stateKey string
+ if event.StateKey() == nil {
+ stateKey = ""
+ } else {
+ stateKey = *event.StateKey()
+ }
+
+ prevEvent, err := s.db.GetStateEvent(
+ context.TODO(), event.Type(), event.RoomID(), stateKey,
+ )
+ if err != nil {
+ return event, err
+ }
+
+ if prevEvent == nil {
+ return event, nil
+ }
+
+ prev := types.PrevEventRef{
+ PrevContent: prevEvent.Content(),
+ ReplacesState: prevEvent.EventID(),
+ PrevSender: prevEvent.Sender(),
+ }
+
+ return event.SetUnsigned(prev)
+}
+
+func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
+ have := map[string]bool{}
+ for _, event := range events {
+ have[event.EventID()] = true
+ }
+ var missing []string
+ for _, eventID := range required {
+ if !have[eventID] {
+ missing = append(missing, eventID)
+ }
+ }
+ return missing
+}
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
new file mode 100644
index 00000000..93d939c3
--- /dev/null
+++ b/syncapi/routing/routing.go
@@ -0,0 +1,61 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/clientapi/auth"
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/util"
+)
+
+const pathPrefixR0 = "/_matrix/client/r0"
+
+// Setup configures the given mux with sync-server listeners
+func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) {
+ r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
+
+ authData := auth.Data{
+ AccountDB: nil,
+ DeviceDB: deviceDB,
+ AppServices: nil,
+ }
+
+ // TODO: Add AS support for all handlers below.
+ r0mux.Handle("/sync", common.MakeAuthAPI("sync", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ return srp.OnIncomingSyncRequest(req, device)
+ })).Methods(http.MethodGet, http.MethodOptions)
+
+ r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ vars := mux.Vars(req)
+ return OnIncomingStateRequest(req, syncDB, vars["roomID"])
+ })).Methods(http.MethodGet, http.MethodOptions)
+
+ r0mux.Handle("/rooms/{roomID}/state/{type}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ vars := mux.Vars(req)
+ return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], "")
+ })).Methods(http.MethodGet, http.MethodOptions)
+
+ r0mux.Handle("/rooms/{roomID}/state/{type}/{stateKey}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ vars := mux.Vars(req)
+ return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"])
+ })).Methods(http.MethodGet, http.MethodOptions)
+}
diff --git a/syncapi/routing/state.go b/syncapi/routing/state.go
new file mode 100644
index 00000000..6b98a0b7
--- /dev/null
+++ b/syncapi/routing/state.go
@@ -0,0 +1,118 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "encoding/json"
+ "net/http"
+
+ "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ log "github.com/sirupsen/logrus"
+)
+
+type stateEventInStateResp struct {
+ gomatrixserverlib.ClientEvent
+ PrevContent json.RawMessage `json:"prev_content,omitempty"`
+ ReplacesState string `json:"replaces_state,omitempty"`
+}
+
+// OnIncomingStateRequest is called when a client makes a /rooms/{roomID}/state
+// request. It will fetch all the state events from the specified room and will
+// append the necessary keys to them if applicable before returning them.
+// Returns an error if something went wrong in the process.
+// TODO: Check if the user is in the room. If not, check if the room's history
+// is publicly visible. Current behaviour is returning an empty array if the
+// user cannot see the room's history.
+func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
+ // TODO(#287): Auth request and handle the case where the user has left (where
+ // we should return the state at the poin they left)
+
+ stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID)
+ if err != nil {
+ return httputil.LogThenError(req, err)
+ }
+
+ resp := []stateEventInStateResp{}
+ // Fill the prev_content and replaces_state keys if necessary
+ for _, event := range stateEvents {
+ stateEvent := stateEventInStateResp{
+ ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll),
+ }
+ var prevEventRef types.PrevEventRef
+ if len(event.Unsigned()) > 0 {
+ if err := json.Unmarshal(event.Unsigned(), &prevEventRef); err != nil {
+ return httputil.LogThenError(req, err)
+ }
+ // Fills the previous state event ID if the state event replaces another
+ // state event
+ if len(prevEventRef.ReplacesState) > 0 {
+ stateEvent.ReplacesState = prevEventRef.ReplacesState
+ }
+ // Fill the previous event if the state event references a previous event
+ if prevEventRef.PrevContent != nil {
+ stateEvent.PrevContent = prevEventRef.PrevContent
+ }
+ }
+
+ resp = append(resp, stateEvent)
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: resp,
+ }
+}
+
+// OnIncomingStateTypeRequest is called when a client makes a
+// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
+// state to see if there is an event with that type and state key, if there
+// is then (by default) we return the content, otherwise a 404.
+func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse {
+ // TODO(#287): Auth request and handle the case where the user has left (where
+ // we should return the state at the poin they left)
+
+ logger := util.GetLogger(req.Context())
+ logger.WithFields(log.Fields{
+ "roomID": roomID,
+ "evType": evType,
+ "stateKey": stateKey,
+ }).Info("Fetching state")
+
+ event, err := db.GetStateEvent(req.Context(), roomID, evType, stateKey)
+ if err != nil {
+ return httputil.LogThenError(req, err)
+ }
+
+ if event == nil {
+ return util.JSONResponse{
+ Code: http.StatusNotFound,
+ JSON: jsonerror.NotFound("cannot find state"),
+ }
+ }
+
+ stateEvent := stateEventInStateResp{
+ ClientEvent: gomatrixserverlib.ToClientEvent(*event, gomatrixserverlib.FormatAll),
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: stateEvent.Content,
+ }
+}
diff --git a/syncapi/storage/account_data_table.go b/syncapi/storage/account_data_table.go
new file mode 100644
index 00000000..d4d74d15
--- /dev/null
+++ b/syncapi/storage/account_data_table.go
@@ -0,0 +1,141 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+const accountDataSchema = `
+-- This sequence is shared between all the tables generated from kafka logs.
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+
+-- Stores the types of account data that a user set has globally and in each room
+-- and the stream ID when that type was last updated.
+CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
+ -- An incrementing ID which denotes the position in the log that this event resides at.
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ -- ID of the user the data belongs to
+ user_id TEXT NOT NULL,
+ -- ID of the room the data is related to (empty string if not related to a specific room)
+ room_id TEXT NOT NULL,
+ -- Type of the data
+ type TEXT NOT NULL,
+
+ -- We don't want two entries of the same type for the same user
+ CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id);
+`
+
+const insertAccountDataSQL = "" +
+ "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
+ " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
+ " DO UPDATE SET id = EXCLUDED.id" +
+ " RETURNING id"
+
+const selectAccountDataInRangeSQL = "" +
+ "SELECT room_id, type FROM syncapi_account_data_type" +
+ " WHERE user_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id ASC"
+
+const selectMaxAccountDataIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_account_data_type"
+
+type accountDataStatements struct {
+ insertAccountDataStmt *sql.Stmt
+ selectAccountDataInRangeStmt *sql.Stmt
+ selectMaxAccountDataIDStmt *sql.Stmt
+}
+
+func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(accountDataSchema)
+ if err != nil {
+ return
+ }
+ if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
+ return
+ }
+ if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
+ return
+ }
+ if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *accountDataStatements) insertAccountData(
+ ctx context.Context,
+ userID, roomID, dataType string,
+) (pos int64, err error) {
+ err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
+ return
+}
+
+func (s *accountDataStatements) selectAccountDataInRange(
+ ctx context.Context,
+ userID string,
+ oldPos, newPos types.StreamPosition,
+) (data map[string][]string, err error) {
+ data = make(map[string][]string)
+
+ // If both positions are the same, it means that the data was saved after the
+ // latest room event. In that case, we need to decrement the old position as
+ // it would prevent the SQL request from returning anything.
+ if oldPos == newPos {
+ oldPos--
+ }
+
+ rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var dataType string
+ var roomID string
+
+ if err = rows.Scan(&roomID, &dataType); err != nil {
+ return
+ }
+
+ if len(data[roomID]) > 0 {
+ data[roomID] = append(data[roomID], dataType)
+ } else {
+ data[roomID] = []string{dataType}
+ }
+ }
+
+ return
+}
+
+func (s *accountDataStatements) selectMaxAccountDataID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
diff --git a/syncapi/storage/current_room_state_table.go b/syncapi/storage/current_room_state_table.go
new file mode 100644
index 00000000..852bfd76
--- /dev/null
+++ b/syncapi/storage/current_room_state_table.go
@@ -0,0 +1,249 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const currentRoomStateSchema = `
+-- Stores the current room state for every room.
+CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
+ -- The 'room_id' key for the state event.
+ room_id TEXT NOT NULL,
+ -- The state event ID
+ event_id TEXT NOT NULL,
+ -- The state event type e.g 'm.room.member'
+ type TEXT NOT NULL,
+ -- The state_key value for this state event e.g ''
+ state_key TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ event_json TEXT NOT NULL,
+ -- The 'content.membership' value if this event is an m.room.member event. For other
+ -- events, this will be NULL.
+ membership TEXT,
+ -- The serial ID of the output_room_events table when this event became
+ -- part of the current state of the room.
+ added_at BIGINT,
+ -- Clobber based on 3-uple of room_id, type and state_key
+ CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
+);
+-- for event deletion
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id);
+-- for querying membership states of users
+CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
+`
+
+const upsertRoomStateSQL = "" +
+ "INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7)" +
+ " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
+ " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7"
+
+const deleteRoomStateByEventIDSQL = "" +
+ "DELETE FROM syncapi_current_room_state WHERE event_id = $1"
+
+const selectRoomIDsWithMembershipSQL = "" +
+ "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
+
+const selectCurrentStateSQL = "" +
+ "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1"
+
+const selectJoinedUsersSQL = "" +
+ "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
+
+const selectStateEventSQL = "" +
+ "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
+
+const selectEventsWithEventIDsSQL = "" +
+ "SELECT added_at, event_json FROM syncapi_current_room_state WHERE event_id = ANY($1)"
+
+type currentRoomStateStatements struct {
+ upsertRoomStateStmt *sql.Stmt
+ deleteRoomStateByEventIDStmt *sql.Stmt
+ selectRoomIDsWithMembershipStmt *sql.Stmt
+ selectCurrentStateStmt *sql.Stmt
+ selectJoinedUsersStmt *sql.Stmt
+ selectEventsWithEventIDsStmt *sql.Stmt
+ selectStateEventStmt *sql.Stmt
+}
+
+func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(currentRoomStateSchema)
+ if err != nil {
+ return
+ }
+ if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
+ return
+ }
+ if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
+ return
+ }
+ if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
+ return
+ }
+ if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
+ return
+ }
+ if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
+ return
+ }
+ if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
+ return
+ }
+ if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
+ return
+ }
+ return
+}
+
+// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
+func (s *currentRoomStateStatements) selectJoinedUsers(
+ ctx context.Context,
+) (map[string][]string, error) {
+ rows, err := s.selectJoinedUsersStmt.QueryContext(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ result := make(map[string][]string)
+ for rows.Next() {
+ var roomID string
+ var userID string
+ if err := rows.Scan(&roomID, &userID); err != nil {
+ return nil, err
+ }
+ users := result[roomID]
+ users = append(users, userID)
+ result[roomID] = users
+ }
+ return result, nil
+}
+
+// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ membership string, // nolint: unparam
+) ([]string, error) {
+ stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
+ rows, err := stmt.QueryContext(ctx, userID, membership)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ var result []string
+ for rows.Next() {
+ var roomID string
+ if err := rows.Scan(&roomID); err != nil {
+ return nil, err
+ }
+ result = append(result, roomID)
+ }
+ return result, nil
+}
+
+// CurrentState returns all the current state events for the given room.
+func (s *currentRoomStateStatements) selectCurrentState(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) ([]gomatrixserverlib.Event, error) {
+ stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
+ rows, err := stmt.QueryContext(ctx, roomID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ return rowsToEvents(rows)
+}
+
+func (s *currentRoomStateStatements) deleteRoomStateByEventID(
+ ctx context.Context, txn *sql.Tx, eventID string,
+) error {
+ stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
+ _, err := stmt.ExecContext(ctx, eventID)
+ return err
+}
+
+func (s *currentRoomStateStatements) upsertRoomState(
+ ctx context.Context, txn *sql.Tx,
+ event gomatrixserverlib.Event, membership *string, addedAt int64,
+) error {
+ stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.Type(),
+ *event.StateKey(),
+ event.JSON(),
+ membership,
+ addedAt,
+ )
+ return err
+}
+
+func (s *currentRoomStateStatements) selectEventsWithEventIDs(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt)
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ return rowsToStreamEvents(rows)
+}
+
+func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
+ result := []gomatrixserverlib.Event{}
+ for rows.Next() {
+ var eventBytes []byte
+ if err := rows.Scan(&eventBytes); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, ev)
+ }
+ return result, nil
+}
+
+func (s *currentRoomStateStatements) selectStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.Event, error) {
+ stmt := s.selectStateEventStmt
+ var res []byte
+ err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
+ return &ev, err
+}
diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/invites_table.go
new file mode 100644
index 00000000..88c98f7e
--- /dev/null
+++ b/syncapi/storage/invites_table.go
@@ -0,0 +1,133 @@
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inviteEventsSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_invite_events (
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ target_user_id TEXT NOT NULL,
+ event_json TEXT NOT NULL
+);
+
+-- For looking up the invites for a given user.
+CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
+ ON syncapi_invite_events (target_user_id, id);
+
+-- For deleting old invites
+CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
+ ON syncapi_invite_events(target_user_id, id);
+`
+
+const insertInviteEventSQL = "" +
+ "INSERT INTO syncapi_invite_events (" +
+ " room_id, event_id, target_user_id, event_json" +
+ ") VALUES ($1, $2, $3, $4) RETURNING id"
+
+const deleteInviteEventSQL = "" +
+ "DELETE FROM syncapi_invite_events WHERE event_id = $1"
+
+const selectInviteEventsInRangeSQL = "" +
+ "SELECT room_id, event_json FROM syncapi_invite_events" +
+ " WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id DESC"
+
+const selectMaxInviteIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_invite_events"
+
+type inviteEventsStatements struct {
+ insertInviteEventStmt *sql.Stmt
+ selectInviteEventsInRangeStmt *sql.Stmt
+ deleteInviteEventStmt *sql.Stmt
+ selectMaxInviteIDStmt *sql.Stmt
+}
+
+func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(inviteEventsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil {
+ return
+ }
+ if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil {
+ return
+ }
+ if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil {
+ return
+ }
+ if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inviteEventsStatements) insertInviteEvent(
+ ctx context.Context, inviteEvent gomatrixserverlib.Event,
+) (streamPos int64, err error) {
+ err = s.insertInviteEventStmt.QueryRowContext(
+ ctx,
+ inviteEvent.RoomID(),
+ inviteEvent.EventID(),
+ *inviteEvent.StateKey(),
+ inviteEvent.JSON(),
+ ).Scan(&streamPos)
+ return
+}
+
+func (s *inviteEventsStatements) deleteInviteEvent(
+ ctx context.Context, inviteEventID string,
+) error {
+ _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID)
+ return err
+}
+
+// selectInviteEventsInRange returns a map of room ID to invite event for the
+// active invites for the target user ID in the supplied range.
+func (s *inviteEventsStatements) selectInviteEventsInRange(
+ ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64,
+) (map[string]gomatrixserverlib.Event, error) {
+ stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
+ rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ result := map[string]gomatrixserverlib.Event{}
+ for rows.Next() {
+ var (
+ roomID string
+ eventJSON []byte
+ )
+ if err = rows.Scan(&roomID, &eventJSON); err != nil {
+ return nil, err
+ }
+
+ event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
+ if err != nil {
+ return nil, err
+ }
+
+ result[roomID] = event
+ }
+ return result, nil
+}
+
+func (s *inviteEventsStatements) selectMaxInviteID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxInviteIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/output_room_events_table.go
new file mode 100644
index 00000000..035db988
--- /dev/null
+++ b/syncapi/storage/output_room_events_table.go
@@ -0,0 +1,294 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+ "sort"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const outputRoomEventsSchema = `
+-- This sequence is shared between all the tables generated from kafka logs.
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+
+-- Stores output room events received from the roomserver.
+CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
+ -- An incrementing ID which denotes the position in the log that this event resides at.
+ -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
+ -- This isn't a problem for us since we just want to order by this field.
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ -- The event ID for the event
+ event_id TEXT NOT NULL,
+ -- The 'room_id' key for the event.
+ room_id TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ event_json TEXT NOT NULL,
+ -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
+ -- if there is no delta.
+ add_state_ids TEXT[],
+ remove_state_ids TEXT[],
+ device_id TEXT, -- The local device that sent the event, if any
+ transaction_id TEXT -- The transaction id used to send the event, if any
+);
+-- for event selection
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
+`
+
+const insertEventSQL = "" +
+ "INSERT INTO syncapi_output_room_events (" +
+ " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
+ ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
+
+const selectEventsSQL = "" +
+ "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
+
+const selectRecentEventsSQL = "" +
+ "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
+ " WHERE room_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id DESC LIMIT $4"
+
+const selectMaxEventIDSQL = "" +
+ "SELECT MAX(id) FROM syncapi_output_room_events"
+
+// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
+const selectStateInRangeSQL = "" +
+ "SELECT id, event_json, add_state_ids, remove_state_ids" +
+ " FROM syncapi_output_room_events" +
+ " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
+ " ORDER BY id ASC"
+
+type outputRoomEventsStatements struct {
+ insertEventStmt *sql.Stmt
+ selectEventsStmt *sql.Stmt
+ selectMaxEventIDStmt *sql.Stmt
+ selectRecentEventsStmt *sql.Stmt
+ selectStateInRangeStmt *sql.Stmt
+}
+
+func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(outputRoomEventsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
+ return
+ }
+ if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
+ return
+ }
+ if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
+ return
+ }
+ if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
+ return
+ }
+ return
+}
+
+// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
+// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
+// two positions, only the most recent state is returned.
+func (s *outputRoomEventsStatements) selectStateInRange(
+ ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
+) (map[string]map[string]bool, map[string]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
+
+ rows, err := stmt.QueryContext(ctx, oldPos, newPos)
+ if err != nil {
+ return nil, nil, err
+ }
+ // Fetch all the state change events for all rooms between the two positions then loop each event and:
+ // - Keep a cache of the event by ID (99% of state change events are for the event itself)
+ // - For each room ID, build up an array of event IDs which represents cumulative adds/removes
+ // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
+ // if they aren't in the event ID cache. We don't handle state deletion yet.
+ eventIDToEvent := make(map[string]streamEvent)
+
+ // RoomID => A set (map[string]bool) of state event IDs which are between the two positions
+ stateNeeded := make(map[string]map[string]bool)
+
+ for rows.Next() {
+ var (
+ streamPos int64
+ eventBytes []byte
+ addIDs pq.StringArray
+ delIDs pq.StringArray
+ )
+ if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
+ return nil, nil, err
+ }
+ // Sanity check for deleted state and whine if we see it. We don't need to do anything
+ // since it'll just mark the event as not being needed.
+ if len(addIDs) < len(delIDs) {
+ log.WithFields(log.Fields{
+ "since": oldPos,
+ "current": newPos,
+ "adds": addIDs,
+ "dels": delIDs,
+ }).Warn("StateBetween: ignoring deleted state")
+ }
+
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, nil, err
+ }
+ needSet := stateNeeded[ev.RoomID()]
+ if needSet == nil { // make set if required
+ needSet = make(map[string]bool)
+ }
+ for _, id := range delIDs {
+ needSet[id] = false
+ }
+ for _, id := range addIDs {
+ needSet[id] = true
+ }
+ stateNeeded[ev.RoomID()] = needSet
+
+ eventIDToEvent[ev.EventID()] = streamEvent{
+ Event: ev,
+ streamPosition: types.StreamPosition(streamPos),
+ }
+ }
+
+ return stateNeeded, eventIDToEvent, nil
+}
+
+// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
+// then this function should only ever be used at startup, as it will race with inserting events if it is
+// done afterwards. If there are no inserted events, 0 is returned.
+func (s *outputRoomEventsStatements) selectMaxEventID(
+ ctx context.Context, txn *sql.Tx,
+) (id int64, err error) {
+ var nullableID sql.NullInt64
+ stmt := common.TxStmt(txn, s.selectMaxEventIDStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&nullableID)
+ if nullableID.Valid {
+ id = nullableID.Int64
+ }
+ return
+}
+
+// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
+// of the inserted event.
+func (s *outputRoomEventsStatements) insertEvent(
+ ctx context.Context, txn *sql.Tx,
+ event *gomatrixserverlib.Event, addState, removeState []string,
+ transactionID *api.TransactionID,
+) (streamPos int64, err error) {
+ var deviceID, txnID *string
+ if transactionID != nil {
+ deviceID = &transactionID.DeviceID
+ txnID = &transactionID.TransactionID
+ }
+
+ stmt := common.TxStmt(txn, s.insertEventStmt)
+ err = stmt.QueryRowContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.JSON(),
+ pq.StringArray(addState),
+ pq.StringArray(removeState),
+ deviceID,
+ txnID,
+ ).Scan(&streamPos)
+ return
+}
+
+// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
+func (s *outputRoomEventsStatements) selectRecentEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomID string, fromPos, toPos types.StreamPosition, limit int,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
+ rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ events, err := rowsToStreamEvents(rows)
+ if err != nil {
+ return nil, err
+ }
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(events, func(i int, j int) bool {
+ return events[i].streamPosition < events[j].streamPosition
+ })
+ return events, nil
+}
+
+// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
+// from the database.
+func (s *outputRoomEventsStatements) selectEvents(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectEventsStmt)
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ return rowsToStreamEvents(rows)
+}
+
+func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
+ var result []streamEvent
+ for rows.Next() {
+ var (
+ streamPos int64
+ eventBytes []byte
+ deviceID *string
+ txnID *string
+ transactionID *api.TransactionID
+ )
+ if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
+ if err != nil {
+ return nil, err
+ }
+
+ if deviceID != nil && txnID != nil {
+ transactionID = &api.TransactionID{
+ DeviceID: *deviceID,
+ TransactionID: *txnID,
+ }
+ }
+
+ result = append(result, streamEvent{
+ Event: ev,
+ streamPosition: types.StreamPosition(streamPos),
+ transactionID: transactionID,
+ })
+ }
+ return result, nil
+}
diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go
new file mode 100644
index 00000000..ec973e2c
--- /dev/null
+++ b/syncapi/storage/syncserver.go
@@ -0,0 +1,695 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ // Import the postgres database driver.
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type stateDelta struct {
+ roomID string
+ stateEvents []gomatrixserverlib.Event
+ membership string
+ // The stream position of the latest membership event for this user, if applicable.
+ // Can be 0 if there is no membership event in this delta.
+ membershipPos types.StreamPosition
+}
+
+// Same as gomatrixserverlib.Event but also has the stream position for this event.
+type streamEvent struct {
+ gomatrixserverlib.Event
+ streamPosition types.StreamPosition
+ transactionID *api.TransactionID
+}
+
+// SyncServerDatabase represents a sync server database
+type SyncServerDatabase struct {
+ db *sql.DB
+ common.PartitionOffsetStatements
+ accountData accountDataStatements
+ events outputRoomEventsStatements
+ roomstate currentRoomStateStatements
+ invites inviteEventsStatements
+}
+
+// NewSyncServerDatabase creates a new sync server database
+func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
+ var d SyncServerDatabase
+ var err error
+ if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
+ return nil, err
+ }
+ if err = d.accountData.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err = d.events.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err := d.roomstate.prepare(d.db); err != nil {
+ return nil, err
+ }
+ if err := d.invites.prepare(d.db); err != nil {
+ return nil, err
+ }
+ return &d, nil
+}
+
+// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
+func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
+ return d.roomstate.selectJoinedUsers(ctx)
+}
+
+// Events lookups a list of event by their event ID.
+// Returns a list of events matching the requested IDs found in the database.
+// If an event is not found in the database then it will be omitted from the list.
+// Returns an error if there was a problem talking with the database.
+// Does not include any transaction IDs in the returned events.
+func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+ streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ // We don't include a device here as we only include transaction IDs in
+ // incremental syncs.
+ return streamEventsToEvents(nil, streamEvents), nil
+}
+
+// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
+// when generating the stream position for this event. Returns the sync stream position for the inserted event.
+// Returns an error if there was a problem inserting this event.
+func (d *SyncServerDatabase) WriteEvent(
+ ctx context.Context,
+ ev *gomatrixserverlib.Event,
+ addStateEvents []gomatrixserverlib.Event,
+ addStateEventIDs, removeStateEventIDs []string,
+ transactionID *api.TransactionID,
+) (streamPos types.StreamPosition, returnErr error) {
+ returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ var err error
+ pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
+ if err != nil {
+ return err
+ }
+ streamPos = types.StreamPosition(pos)
+
+ if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
+ // Nothing to do, the event may have just been a message event.
+ return nil
+ }
+
+ return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, streamPos)
+ })
+ return
+}
+
+func (d *SyncServerDatabase) updateRoomState(
+ ctx context.Context, txn *sql.Tx,
+ removedEventIDs []string,
+ addedEvents []gomatrixserverlib.Event,
+ streamPos types.StreamPosition,
+) error {
+ // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
+ for _, eventID := range removedEventIDs {
+ if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil {
+ return err
+ }
+ }
+
+ for _, event := range addedEvents {
+ if event.StateKey() == nil {
+ // ignore non state events
+ continue
+ }
+ var membership *string
+ if event.Type() == "m.room.member" {
+ value, err := event.Membership()
+ if err != nil {
+ return err
+ }
+ membership = &value
+ }
+ if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, int64(streamPos)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
+// If no event could be found, returns nil
+// If there was an issue during the retrieval, returns an error
+func (d *SyncServerDatabase) GetStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.Event, error) {
+ return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
+}
+
+// GetStateEventsForRoom fetches the state events for a given room.
+// Returns an empty slice if no state events could be found for this room.
+// Returns an error if there was an issue with the retrieval.
+func (d *SyncServerDatabase) GetStateEventsForRoom(
+ ctx context.Context, roomID string,
+) (stateEvents []gomatrixserverlib.Event, err error) {
+ err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
+ return err
+ })
+ return
+}
+
+// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
+func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
+ return d.syncStreamPositionTx(ctx, nil)
+}
+
+func (d *SyncServerDatabase) syncStreamPositionTx(
+ ctx context.Context, txn *sql.Tx,
+) (types.StreamPosition, error) {
+ maxID, err := d.events.selectMaxEventID(ctx, txn)
+ if err != nil {
+ return 0, err
+ }
+ maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
+ if err != nil {
+ return 0, err
+ }
+ if maxAccountDataID > maxID {
+ maxID = maxAccountDataID
+ }
+ maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn)
+ if err != nil {
+ return 0, err
+ }
+ if maxInviteID > maxID {
+ maxID = maxInviteID
+ }
+ return types.StreamPosition(maxID), nil
+}
+
+// IncrementalSync returns all the data needed in order to create an incremental
+// sync response for the given user. Events returned will include any client
+// transaction IDs associated with the given device. These transaction IDs come
+// from when the device sent the event via an API that included a transaction
+// ID.
+func (d *SyncServerDatabase) IncrementalSync(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos types.StreamPosition,
+ numRecentEventsPerRoom int,
+) (*types.Response, error) {
+ txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return nil, err
+ }
+ var succeeded bool
+ defer common.EndTransaction(txn, &succeeded)
+
+ // Work out which rooms to return in the response. This is done by getting not only the currently
+ // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
+ // This works out what the 'state' key should be for each room as well as which membership block
+ // to put the room into.
+ deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
+ if err != nil {
+ return nil, err
+ }
+
+ res := types.NewResponse(toPos)
+ for _, delta := range deltas {
+ err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // TODO: This should be done in getStateDeltas
+ if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ return nil, err
+ }
+
+ succeeded = true
+ return res, nil
+}
+
+// CompleteSync a complete /sync API response for the given user.
+func (d *SyncServerDatabase) CompleteSync(
+ ctx context.Context, userID string, numRecentEventsPerRoom int,
+) (*types.Response, error) {
+ // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
+ // a consistent view of the database throughout. This includes extracting the sync stream position.
+ // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
+ // but it's better to not hide the fact that this is being done in a transaction.
+ txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
+ if err != nil {
+ return nil, err
+ }
+ var succeeded bool
+ defer common.EndTransaction(txn, &succeeded)
+
+ // Get the current stream position which we will base the sync response on.
+ pos, err := d.syncStreamPositionTx(ctx, txn)
+ if err != nil {
+ return nil, err
+ }
+
+ // Extract room state and recent events for all rooms the user is joined to.
+ roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
+ if err != nil {
+ return nil, err
+ }
+
+ // Build up a /sync response. Add joined rooms.
+ res := types.NewResponse(pos)
+ for _, roomID := range roomIDs {
+ var stateEvents []gomatrixserverlib.Event
+ stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
+ if err != nil {
+ return nil, err
+ }
+ // TODO: When filters are added, we may need to call this multiple times to get enough events.
+ // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
+ var recentStreamEvents []streamEvent
+ recentStreamEvents, err = d.events.selectRecentEvents(
+ ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // We don't include a device here as we don't need to send down
+ // transaction IDs for complete syncs
+ recentEvents := streamEventsToEvents(nil, recentStreamEvents)
+
+ stateEvents = removeDuplicates(stateEvents, recentEvents)
+ jr := types.NewJoinResponse()
+ if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
+ jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ } else {
+ jr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ }
+ jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = true
+ jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[roomID] = *jr
+ }
+
+ if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil {
+ return nil, err
+ }
+
+ succeeded = true
+ return res, err
+}
+
+var txReadOnlySnapshot = sql.TxOptions{
+ // Set the isolation level so that we see a snapshot of the database.
+ // In PostgreSQL repeatable read transactions will see a snapshot taken
+ // at the first query, and since the transaction is read-only it can't
+ // run into any serialisation errors.
+ // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
+ Isolation: sql.LevelRepeatableRead,
+ ReadOnly: true,
+}
+
+// GetAccountDataInRange returns all account data for a given user inserted or
+// updated between two given positions
+// Returns a map following the format data[roomID] = []dataTypes
+// If no data is retrieved, returns an empty map
+// If there was an issue with the retrieval, returns an error
+func (d *SyncServerDatabase) GetAccountDataInRange(
+ ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
+) (map[string][]string, error) {
+ return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos)
+}
+
+// UpsertAccountData keeps track of new or updated account data, by saving the type
+// of the new/updated data, and the user ID and room ID the data is related to (empty)
+// room ID means the data isn't specific to any room)
+// If no data with the given type, user ID and room ID exists in the database,
+// creates a new row, else update the existing one
+// Returns an error if there was an issue with the upsert
+func (d *SyncServerDatabase) UpsertAccountData(
+ ctx context.Context, userID, roomID, dataType string,
+) (types.StreamPosition, error) {
+ pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
+ return types.StreamPosition(pos), err
+}
+
+// AddInviteEvent stores a new invite event for a user.
+// If the invite was successfully stored this returns the stream ID it was stored at.
+// Returns an error if there was a problem communicating with the database.
+func (d *SyncServerDatabase) AddInviteEvent(
+ ctx context.Context, inviteEvent gomatrixserverlib.Event,
+) (types.StreamPosition, error) {
+ pos, err := d.invites.insertInviteEvent(ctx, inviteEvent)
+ return types.StreamPosition(pos), err
+}
+
+// RetireInviteEvent removes an old invite event from the database.
+// Returns an error if there was a problem communicating with the database.
+func (d *SyncServerDatabase) RetireInviteEvent(
+ ctx context.Context, inviteEventID string,
+) error {
+ // TODO: Record that invite has been retired in a stream so that we can
+ // notify the user in an incremental sync.
+ err := d.invites.deleteInviteEvent(ctx, inviteEventID)
+ return err
+}
+
+func (d *SyncServerDatabase) addInvitesToResponse(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+ fromPos, toPos types.StreamPosition,
+ res *types.Response,
+) error {
+ invites, err := d.invites.selectInviteEventsInRange(
+ ctx, txn, userID, int64(fromPos), int64(toPos),
+ )
+ if err != nil {
+ return err
+ }
+ for roomID, inviteEvent := range invites {
+ ir := types.NewInviteResponse()
+ ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
+ []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
+ )
+ // TODO: add the invite state from the invite event.
+ res.Rooms.Invite[roomID] = *ir
+ }
+ return nil
+}
+
+// addRoomDeltaToResponse adds a room state delta to a sync response
+func (d *SyncServerDatabase) addRoomDeltaToResponse(
+ ctx context.Context,
+ device *authtypes.Device,
+ txn *sql.Tx,
+ fromPos, toPos types.StreamPosition,
+ delta stateDelta,
+ numRecentEventsPerRoom int,
+ res *types.Response,
+) error {
+ endPos := toPos
+ if delta.membershipPos > 0 && delta.membership == "leave" {
+ // make sure we don't leak recent events after the leave event.
+ // TODO: History visibility makes this somewhat complex to handle correctly. For example:
+ // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
+ // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
+ // in a single /sync request
+ // This is all "okay" assuming history_visibility == "shared" which it is by default.
+ endPos = delta.membershipPos
+ }
+ recentStreamEvents, err := d.events.selectRecentEvents(
+ ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return err
+ }
+ recentEvents := streamEventsToEvents(device, recentStreamEvents)
+ delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
+
+ // Don't bother appending empty room entries
+ if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
+ return nil
+ }
+
+ switch delta.membership {
+ case "join":
+ jr := types.NewJoinResponse()
+ if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
+ jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ } else {
+ jr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ }
+ jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[delta.roomID] = *jr
+ case "leave":
+ fallthrough // transitions to leave are the same as ban
+ case "ban":
+ // TODO: recentEvents may contain events that this user is not allowed to see because they are
+ // no longer in the room.
+ lr := types.NewLeaveResponse()
+ if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
+ lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ } else {
+ lr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ }
+ lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Leave[delta.roomID] = *lr
+ }
+
+ return nil
+}
+
+// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
+// Returns a map of room ID to list of events.
+func (d *SyncServerDatabase) fetchStateEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomIDToEventIDSet map[string]map[string]bool,
+ eventIDToEvent map[string]streamEvent,
+) (map[string][]streamEvent, error) {
+ stateBetween := make(map[string][]streamEvent)
+ missingEvents := make(map[string][]string)
+ for roomID, ids := range roomIDToEventIDSet {
+ events := stateBetween[roomID]
+ for id, need := range ids {
+ if !need {
+ continue // deleted state
+ }
+ e, ok := eventIDToEvent[id]
+ if ok {
+ events = append(events, e)
+ } else {
+ m := missingEvents[roomID]
+ m = append(m, id)
+ missingEvents[roomID] = m
+ }
+ }
+ stateBetween[roomID] = events
+ }
+
+ if len(missingEvents) > 0 {
+ // This happens when add_state_ids has an event ID which is not in the provided range.
+ // We need to explicitly fetch them.
+ allMissingEventIDs := []string{}
+ for _, missingEvIDs := range missingEvents {
+ allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
+ }
+ evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
+ if err != nil {
+ return nil, err
+ }
+ // we know we got them all otherwise an error would've been returned, so just loop the events
+ for _, ev := range evs {
+ roomID := ev.RoomID()
+ stateBetween[roomID] = append(stateBetween[roomID], ev)
+ }
+ }
+ return stateBetween, nil
+}
+
+func (d *SyncServerDatabase) fetchMissingStateEvents(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]streamEvent, error) {
+ // Fetch from the events table first so we pick up the stream ID for the
+ // event.
+ events, err := d.events.selectEvents(ctx, txn, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ have := map[string]bool{}
+ for _, event := range events {
+ have[event.EventID()] = true
+ }
+ var missing []string
+ for _, eventID := range eventIDs {
+ if !have[eventID] {
+ missing = append(missing, eventID)
+ }
+ }
+ if len(missing) == 0 {
+ return events, nil
+ }
+
+ // If they are missing from the events table then they should be state
+ // events that we received from outside the main event stream.
+ // These should be in the room state table.
+ stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing)
+
+ if err != nil {
+ return nil, err
+ }
+ if len(stateEvents) != len(missing) {
+ return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
+ }
+ events = append(events, stateEvents...)
+ return events, nil
+}
+
+func (d *SyncServerDatabase) getStateDeltas(
+ ctx context.Context, device *authtypes.Device, txn *sql.Tx,
+ fromPos, toPos types.StreamPosition, userID string,
+) ([]stateDelta, error) {
+ // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
+ // - Get membership list changes for this user in this sync response
+ // - For each room which has membership list changes:
+ // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
+ // If it is, then we need to send the full room state down (and 'limited' is always true).
+ // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
+ // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
+ // - Get all CURRENTLY joined rooms, and add them to 'joined' block.
+ var deltas []stateDelta
+
+ // get all the state events ever between these two positions
+ stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
+ if err != nil {
+ return nil, err
+ }
+ state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
+ if err != nil {
+ return nil, err
+ }
+
+ for roomID, stateStreamEvents := range state {
+ for _, ev := range stateStreamEvents {
+ // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
+ // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
+ // dupe join events will result in the entire room state coming down to the client again. This is added in
+ // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
+ // the timeline.
+ if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership == "join" {
+ // send full room state down instead of a delta
+ var allState []gomatrixserverlib.Event
+ allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
+ if err != nil {
+ return nil, err
+ }
+ s := make([]streamEvent, len(allState))
+ for i := 0; i < len(s); i++ {
+ s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)}
+ }
+ state[roomID] = s
+ continue // we'll add this room in when we do joined rooms
+ }
+
+ deltas = append(deltas, stateDelta{
+ membership: membership,
+ membershipPos: ev.streamPosition,
+ stateEvents: streamEventsToEvents(device, stateStreamEvents),
+ roomID: roomID,
+ })
+ break
+ }
+ }
+ }
+
+ // Add in currently joined rooms
+ joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
+ if err != nil {
+ return nil, err
+ }
+ for _, joinedRoomID := range joinedRoomIDs {
+ deltas = append(deltas, stateDelta{
+ membership: "join",
+ stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
+ roomID: joinedRoomID,
+ })
+ }
+
+ return deltas, nil
+}
+
+// streamEventsToEvents converts streamEvent to Event. If device is non-nil and
+// matches the streamevent.transactionID device then the transaction ID gets
+// added to the unsigned section of the output event.
+func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event {
+ out := make([]gomatrixserverlib.Event, len(in))
+ for i := 0; i < len(in); i++ {
+ out[i] = in[i].Event
+ if device != nil && in[i].transactionID != nil {
+ if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID {
+ err := out[i].SetUnsignedField(
+ "transaction_id", in[i].transactionID.TransactionID,
+ )
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "event_id": out[i].EventID(),
+ }).WithError(err).Warnf("Failed to add transaction ID to event")
+ }
+ }
+ }
+ }
+ return out
+}
+
+// There may be some overlap where events in stateEvents are already in recentEvents, so filter
+// them out so we don't include them twice in the /sync response. They should be in recentEvents
+// only, so clients get to the correct state once they have rolled forward.
+func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
+ for _, recentEv := range recentEvents {
+ if recentEv.StateKey() == nil {
+ continue // not a state event
+ }
+ // TODO: This is a linear scan over all the current state events in this room. This will
+ // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
+ // then do a binary search to find matching events, similar to what roomserver does.
+ for j := 0; j < len(stateEvents); j++ {
+ if stateEvents[j].EventID() == recentEv.EventID() {
+ // overwrite the element to remove with the last element then pop the last element.
+ // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
+ // (we don't care about the order of stateEvents)
+ stateEvents[j] = stateEvents[len(stateEvents)-1]
+ stateEvents = stateEvents[:len(stateEvents)-1]
+ break // there shouldn't be multiple events with the same event ID
+ }
+ }
+ }
+ return stateEvents
+}
+
+// getMembershipFromEvent returns the value of content.membership iff the event is a state event
+// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
+func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
+ if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
+ membership, err := ev.Membership()
+ if err != nil {
+ return ""
+ }
+ return membership
+ }
+ return ""
+}
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
new file mode 100644
index 00000000..5ed701d8
--- /dev/null
+++ b/syncapi/sync/notifier.go
@@ -0,0 +1,243 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+// Notifier will wake up sleeping requests when there is some new data.
+// It does not tell requests what that data is, only the stream position which
+// they can use to get at it. This is done to prevent races whereby we tell the caller
+// the event, but the token has already advanced by the time they fetch it, resulting
+// in missed events.
+type Notifier struct {
+ // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
+ roomIDToJoinedUsers map[string]userIDSet
+ // Protects currPos and userStreams.
+ streamLock *sync.Mutex
+ // The latest sync stream position
+ currPos types.StreamPosition
+ // A map of user_id => UserStream which can be used to wake a given user's /sync request.
+ userStreams map[string]*UserStream
+ // The last time we cleaned out stale entries from the userStreams map
+ lastCleanUpTime time.Time
+}
+
+// NewNotifier creates a new notifier set to the given stream position.
+// In order for this to be of any use, the Notifier needs to be told all rooms and
+// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
+func NewNotifier(pos types.StreamPosition) *Notifier {
+ return &Notifier{
+ currPos: pos,
+ roomIDToJoinedUsers: make(map[string]userIDSet),
+ userStreams: make(map[string]*UserStream),
+ streamLock: &sync.Mutex{},
+ lastCleanUpTime: time.Now(),
+ }
+}
+
+// OnNewEvent is called when a new event is received from the room server. Must only be
+// called from a single goroutine, to avoid races between updates which could set the
+// current position in the stream incorrectly.
+// Can be called either with a *gomatrixserverlib.Event, or with an user ID
+func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
+ // update the current position then notify relevant /sync streams.
+ // This needs to be done PRIOR to waking up users as they will read this value.
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ n.currPos = pos
+
+ n.removeEmptyUserStreams()
+
+ if ev != nil {
+ // Map this event's room_id to a list of joined users, and wake them up.
+ userIDs := n.joinedUsers(ev.RoomID())
+ // If this is an invite, also add in the invitee to this list.
+ if ev.Type() == "m.room.member" && ev.StateKey() != nil {
+ targetUserID := *ev.StateKey()
+ membership, err := ev.Membership()
+ if err != nil {
+ log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
+ "Notifier.OnNewEvent: Failed to unmarshal member event",
+ )
+ } else {
+ // Keep the joined user map up-to-date
+ switch membership {
+ case "invite":
+ userIDs = append(userIDs, targetUserID)
+ case "join":
+ // Manually append the new user's ID so they get notified
+ // along all members in the room
+ userIDs = append(userIDs, targetUserID)
+ n.addJoinedUser(ev.RoomID(), targetUserID)
+ case "leave":
+ fallthrough
+ case "ban":
+ n.removeJoinedUser(ev.RoomID(), targetUserID)
+ }
+ }
+ }
+
+ for _, toNotifyUserID := range userIDs {
+ n.wakeupUser(toNotifyUserID, pos)
+ }
+ } else if len(userID) > 0 {
+ n.wakeupUser(userID, pos)
+ }
+}
+
+// GetListener returns a UserStreamListener that can be used to wait for
+// updates for a user. Must be closed.
+// notify for anything before sincePos
+func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
+ // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
+ // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
+ // - Incoming events wake requests for a matching room ID
+ // - Incoming events wake requests for a matching user ID (needed for invites)
+
+ // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
+ // but given we don't do /events, let's pretend it doesn't exist.
+
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.removeEmptyUserStreams()
+
+ return n.fetchUserStream(req.device.UserID, true).GetListener(req.ctx)
+}
+
+// Load the membership states required to notify users correctly.
+func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) error {
+ roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
+ if err != nil {
+ return err
+ }
+ n.setUsersJoinedToRooms(roomToUsers)
+ return nil
+}
+
+// CurrentPosition returns the current stream position
+func (n *Notifier) CurrentPosition() types.StreamPosition {
+ return n.currPos
+}
+
+// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
+// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
+// OnNewEvent (eg on startup) to prevent racing.
+func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
+ // This is just the bulk form of addJoinedUser
+ for roomID, userIDs := range roomIDToUserIDs {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ for _, userID := range userIDs {
+ n.roomIDToJoinedUsers[roomID].add(userID)
+ }
+ }
+}
+
+func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
+ stream := n.fetchUserStream(userID, false)
+ if stream == nil {
+ return
+ }
+ stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
+}
+
+// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
+// a stream will be made for this user if one doesn't exist and it will be returned. This
+// function does not wait for data to be available on the stream.
+func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
+ stream, ok := n.userStreams[userID]
+ if !ok && makeIfNotExists {
+ // TODO: Unbounded growth of streams (1 per user)
+ stream = NewUserStream(userID, n.currPos)
+ n.userStreams[userID] = stream
+ }
+ return stream
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) addJoinedUser(roomID, userID string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ n.roomIDToJoinedUsers[roomID].add(userID)
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) removeJoinedUser(roomID, userID string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ n.roomIDToJoinedUsers[roomID].remove(userID)
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ return
+ }
+ return n.roomIDToJoinedUsers[roomID].values()
+}
+
+// removeEmptyUserStreams iterates through the user stream map and removes any
+// that have been empty for a certain amount of time. This is a crude way of
+// ensuring that the userStreams map doesn't grow forver.
+// This should be called when the notifier gets called for whatever reason,
+// the function itself is responsible for ensuring it doesn't iterate too
+// often.
+// NB: Callers should have locked the mutex before calling this function.
+func (n *Notifier) removeEmptyUserStreams() {
+ // Only clean up now and again
+ now := time.Now()
+ if n.lastCleanUpTime.Add(time.Minute).After(now) {
+ return
+ }
+ n.lastCleanUpTime = now
+
+ deleteBefore := now.Add(-5 * time.Minute)
+ for key, value := range n.userStreams {
+ if value.TimeOfLastNonEmpty().Before(deleteBefore) {
+ delete(n.userStreams, key)
+ }
+ }
+}
+
+// A string set, mainly existing for improving clarity of structs in this file.
+type userIDSet map[string]bool
+
+func (s userIDSet) add(str string) {
+ s[str] = true
+}
+
+func (s userIDSet) remove(str string) {
+ delete(s, str)
+}
+
+func (s userIDSet) values() (vals []string) {
+ for str := range s {
+ vals = append(vals, str)
+ }
+ return
+}
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
new file mode 100644
index 00000000..4fa54393
--- /dev/null
+++ b/syncapi/sync/notifier_test.go
@@ -0,0 +1,293 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+var (
+ randomMessageEvent gomatrixserverlib.Event
+ aliceInviteBobEvent gomatrixserverlib.Event
+ bobLeaveEvent gomatrixserverlib.Event
+)
+
+var (
+ streamPositionVeryOld = types.StreamPosition(5)
+ streamPositionBefore = types.StreamPosition(11)
+ streamPositionAfter = types.StreamPosition(12)
+ streamPositionAfter2 = types.StreamPosition(13)
+ roomID = "!test:localhost"
+ alice = "@alice:localhost"
+ bob = "@bob:localhost"
+)
+
+func init() {
+ var err error
+ randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
+ "type": "m.room.message",
+ "content": {
+ "body": "Hello World",
+ "msgtype": "m.text"
+ },
+ "sender": "@noone:localhost",
+ "room_id": "`+roomID+`",
+ "origin_server_ts": 12345,
+ "event_id": "$randomMessageEvent:localhost"
+ }`), false)
+ if err != nil {
+ panic(err)
+ }
+ aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
+ "type": "m.room.member",
+ "state_key": "`+bob+`",
+ "content": {
+ "membership": "invite"
+ },
+ "sender": "`+alice+`",
+ "room_id": "`+roomID+`",
+ "origin_server_ts": 12345,
+ "event_id": "$aliceInviteBobEvent:localhost"
+ }`), false)
+ if err != nil {
+ panic(err)
+ }
+ bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
+ "type": "m.room.member",
+ "state_key": "`+bob+`",
+ "content": {
+ "membership": "leave"
+ },
+ "sender": "`+bob+`",
+ "room_id": "`+roomID+`",
+ "origin_server_ts": 12345,
+ "event_id": "$bobLeaveEvent:localhost"
+ }`), false)
+ if err != nil {
+ panic(err)
+ }
+}
+
+// Test that the current position is returned if a request is already behind.
+func TestImmediateNotification(t *testing.T) {
+ n := NewNotifier(streamPositionBefore)
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
+ if err != nil {
+ t.Fatalf("TestImmediateNotification error: %s", err)
+ }
+ if pos != streamPositionBefore {
+ t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
+ }
+}
+
+// Test that new events to a joined room unblocks the request.
+func TestNewEventAndJoinedToRoom(t *testing.T) {
+ n := NewNotifier(streamPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ if err != nil {
+ t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
+ }
+ if pos != streamPositionAfter {
+ t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ }
+ wg.Done()
+ }()
+
+ stream := n.fetchUserStream(bob, true)
+ waitForBlocking(stream, 1)
+
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+
+ wg.Wait()
+}
+
+// Test that an invite unblocks the request
+func TestNewInviteEventForUser(t *testing.T) {
+ n := NewNotifier(streamPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ if err != nil {
+ t.Errorf("TestNewInviteEventForUser error: %s", err)
+ }
+ if pos != streamPositionAfter {
+ t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
+ }
+ wg.Done()
+ }()
+
+ stream := n.fetchUserStream(bob, true)
+ waitForBlocking(stream, 1)
+
+ n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
+
+ wg.Wait()
+}
+
+// Test that all blocked requests get woken up on a new event.
+func TestMultipleRequestWakeup(t *testing.T) {
+ n := NewNotifier(streamPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(3)
+ poll := func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ if err != nil {
+ t.Errorf("TestMultipleRequestWakeup error: %s", err)
+ }
+ if pos != streamPositionAfter {
+ t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
+ }
+ wg.Done()
+ }
+ go poll()
+ go poll()
+ go poll()
+
+ stream := n.fetchUserStream(bob, true)
+ waitForBlocking(stream, 3)
+
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+
+ wg.Wait()
+
+ numWaiting := stream.NumWaiting()
+ if numWaiting != 0 {
+ t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
+ }
+}
+
+// Test that you stop getting woken up when you leave a room.
+func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
+ // listen as bob. Make bob leave room. Make alice send event to room.
+ // Make sure alice gets woken up only and not bob as well.
+ n := NewNotifier(streamPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var leaveWG sync.WaitGroup
+
+ // Make bob leave the room
+ leaveWG.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ if err != nil {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
+ }
+ if pos != streamPositionAfter {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ }
+ leaveWG.Done()
+ }()
+ bobStream := n.fetchUserStream(bob, true)
+ waitForBlocking(bobStream, 1)
+ n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
+ leaveWG.Wait()
+
+ // send an event into the room. Make sure alice gets it. Bob should not.
+ var aliceWG sync.WaitGroup
+ aliceStream := n.fetchUserStream(alice, true)
+ aliceWG.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
+ if err != nil {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
+ }
+ if pos != streamPositionAfter2 {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
+ }
+ aliceWG.Done()
+ }()
+
+ go func() {
+ // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
+ _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
+ if err == nil {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
+ }
+ }()
+
+ waitForBlocking(aliceStream, 1)
+ waitForBlocking(bobStream, 1)
+
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
+ aliceWG.Wait()
+
+ // it's possible that at this point alice has been informed and bob is about to be informed, so wait
+ // for a fraction of a second to account for this race
+ time.Sleep(1 * time.Millisecond)
+}
+
+// same as Notifier.WaitForEvents but with a timeout.
+func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
+ listener := n.GetListener(req)
+ defer listener.Close()
+
+ select {
+ case <-time.After(5 * time.Second):
+ return types.StreamPosition(0), fmt.Errorf(
+ "waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
+ )
+ case <-listener.GetNotifyChannel(*req.since):
+ p := listener.GetStreamPosition()
+ return p, nil
+ }
+}
+
+// Wait until something is Wait()ing on the user stream.
+func waitForBlocking(s *UserStream, numBlocking uint) {
+ for numBlocking != s.NumWaiting() {
+ // This is horrible but I don't want to add a signalling mechanism JUST for testing.
+ time.Sleep(1 * time.Microsecond)
+ }
+}
+
+func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
+ return syncRequest{
+ device: authtypes.Device{UserID: userID},
+ timeout: 1 * time.Minute,
+ since: &since,
+ wantFullState: false,
+ limit: defaultTimelineLimit,
+ log: util.GetLogger(context.TODO()),
+ ctx: context.TODO(),
+ }
+}
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
new file mode 100644
index 00000000..35a15f6f
--- /dev/null
+++ b/syncapi/sync/request.go
@@ -0,0 +1,87 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+import (
+ "context"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/util"
+ log "github.com/sirupsen/logrus"
+)
+
+const defaultSyncTimeout = time.Duration(0)
+const defaultTimelineLimit = 20
+
+// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
+type syncRequest struct {
+ ctx context.Context
+ device authtypes.Device
+ limit int
+ timeout time.Duration
+ since *types.StreamPosition // nil means that no since token was supplied
+ wantFullState bool
+ log *log.Entry
+}
+
+func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) {
+ timeout := getTimeout(req.URL.Query().Get("timeout"))
+ fullState := req.URL.Query().Get("full_state")
+ wantFullState := fullState != "" && fullState != "false"
+ since, err := getSyncStreamPosition(req.URL.Query().Get("since"))
+ if err != nil {
+ return nil, err
+ }
+ // TODO: Additional query params: set_presence, filter
+ return &syncRequest{
+ ctx: req.Context(),
+ device: device,
+ timeout: timeout,
+ since: since,
+ wantFullState: wantFullState,
+ limit: defaultTimelineLimit, // TODO: read from filter
+ log: util.GetLogger(req.Context()),
+ }, nil
+}
+
+func getTimeout(timeoutMS string) time.Duration {
+ if timeoutMS == "" {
+ return defaultSyncTimeout
+ }
+ i, err := strconv.Atoi(timeoutMS)
+ if err != nil {
+ return defaultSyncTimeout
+ }
+ return time.Duration(i) * time.Millisecond
+}
+
+// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
+// stream position. If the string is empty then (nil, nil) is returned.
+func getSyncStreamPosition(since string) (*types.StreamPosition, error) {
+ if since == "" {
+ return nil, nil
+ }
+ i, err := strconv.Atoi(since)
+ if err != nil {
+ return nil, err
+ }
+ token := types.StreamPosition(i)
+ return &token, nil
+}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
new file mode 100644
index 00000000..89137eb5
--- /dev/null
+++ b/syncapi/sync/requestpool.go
@@ -0,0 +1,216 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+import (
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ log "github.com/sirupsen/logrus"
+)
+
+// RequestPool manages HTTP long-poll connections for /sync
+type RequestPool struct {
+ db *storage.SyncServerDatabase
+ accountDB *accounts.Database
+ notifier *Notifier
+}
+
+// NewRequestPool makes a new RequestPool
+func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool {
+ return &RequestPool{db, adb, n}
+}
+
+// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
+// called in a dedicated goroutine for this request. This function will block the goroutine
+// until a response is ready, or it times out.
+func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ var syncData *types.Response
+
+ // Extract values from request
+ logger := util.GetLogger(req.Context())
+ userID := device.UserID
+ syncReq, err := newSyncRequest(req, *device)
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.Unknown(err.Error()),
+ }
+ }
+ logger.WithFields(log.Fields{
+ "userID": userID,
+ "since": syncReq.since,
+ "timeout": syncReq.timeout,
+ }).Info("Incoming /sync request")
+
+ currPos := rp.notifier.CurrentPosition()
+
+ // If this is an initial sync or timeout=0 we return immediately
+ if syncReq.since == nil || syncReq.timeout == 0 {
+ syncData, err = rp.currentSyncForUser(*syncReq, currPos)
+ if err != nil {
+ return httputil.LogThenError(req, err)
+ }
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncData,
+ }
+ }
+
+ // Otherwise, we wait for the notifier to tell us if something *may* have
+ // happened. We loop in case it turns out that nothing did happen.
+
+ timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
+ defer timer.Stop()
+
+ userStreamListener := rp.notifier.GetListener(*syncReq)
+ defer userStreamListener.Close()
+
+ // We need the loop in case userStreamListener wakes up even if there isn't
+ // anything to send down. In this case, we'll jump out of the select but
+ // don't want to send anything back until we get some actual content to
+ // respond with, so we skip the return an go back to waiting for content to
+ // be sent down or the request timing out.
+ var hasTimedOut bool
+ for {
+ select {
+ // Wait for notifier to wake us up
+ case <-userStreamListener.GetNotifyChannel(currPos):
+ currPos = userStreamListener.GetStreamPosition()
+ // Or for timeout to expire
+ case <-timer.C:
+ // We just need to ensure we get out of the select after reaching the
+ // timeout, but there's nothing specific we want to do in this case
+ // apart from that, so we do nothing except stating we're timing out
+ // and need to respond.
+ hasTimedOut = true
+ // Or for the request to be cancelled
+ case <-req.Context().Done():
+ return httputil.LogThenError(req, req.Context().Err())
+ }
+
+ // Note that we don't time out during calculation of sync
+ // response. This ensures that we don't waste the hard work
+ // of calculating the sync only to get timed out before we
+ // can respond
+
+ syncData, err = rp.currentSyncForUser(*syncReq, currPos)
+ if err != nil {
+ return httputil.LogThenError(req, err)
+ }
+
+ if !syncData.IsEmpty() || hasTimedOut {
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncData,
+ }
+ }
+ }
+}
+
+func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
+ // TODO: handle ignored users
+ if req.since == nil {
+ res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
+ } else {
+ res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit)
+ }
+
+ if err != nil {
+ return
+ }
+
+ res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos)
+ return
+}
+
+func (rp *RequestPool) appendAccountData(
+ data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
+) (*types.Response, error) {
+ // TODO: Account data doesn't have a sync position of its own, meaning that
+ // account data might be sent multiple time to the client if multiple account
+ // data keys were set between two message. This isn't a huge issue since the
+ // duplicate data doesn't represent a huge quantity of data, but an optimisation
+ // here would be making sure each data is sent only once to the client.
+ localpart, _, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return nil, err
+ }
+
+ if req.since == nil {
+ // If this is the initial sync, we don't need to check if a data has
+ // already been sent. Instead, we send the whole batch.
+ var global []gomatrixserverlib.ClientEvent
+ var rooms map[string][]gomatrixserverlib.ClientEvent
+ global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart)
+ if err != nil {
+ return nil, err
+ }
+ data.AccountData.Events = global
+
+ for r, j := range data.Rooms.Join {
+ if len(rooms[r]) > 0 {
+ j.AccountData.Events = rooms[r]
+ data.Rooms.Join[r] = j
+ }
+ }
+
+ return data, nil
+ }
+
+ // Sync is not initial, get all account data since the latest sync
+ dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(dataTypes) == 0 {
+ return data, nil
+ }
+
+ // Iterate over the rooms
+ for roomID, dataTypes := range dataTypes {
+ events := []gomatrixserverlib.ClientEvent{}
+ // Request the missing data from the database
+ for _, dataType := range dataTypes {
+ evs, err := rp.accountDB.GetAccountDataByType(
+ req.ctx, localpart, roomID, dataType,
+ )
+ if err != nil {
+ return nil, err
+ }
+ events = append(events, evs...)
+ }
+
+ // Append the data to the response
+ if len(roomID) > 0 {
+ jr := data.Rooms.Join[roomID]
+ jr.AccountData.Events = events
+ data.Rooms.Join[roomID] = jr
+ } else {
+ data.AccountData.Events = events
+ }
+ }
+
+ return data, nil
+}
diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go
new file mode 100644
index 00000000..77d09c20
--- /dev/null
+++ b/syncapi/sync/userstream.go
@@ -0,0 +1,162 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+import (
+ "context"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/util"
+)
+
+// UserStream represents a communication mechanism between the /sync request goroutine
+// and the underlying sync server goroutines.
+// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast()
+// updates.
+type UserStream struct {
+ UserID string
+ // The lock that protects changes to this struct
+ lock sync.Mutex
+ // Closed when there is an update.
+ signalChannel chan struct{}
+ // The last stream position that there may have been an update for the suser
+ pos types.StreamPosition
+ // The last time when we had some listeners waiting
+ timeOfLastChannel time.Time
+ // The number of listeners waiting
+ numWaiting uint
+}
+
+// UserStreamListener allows a sync request to wait for updates for a user.
+type UserStreamListener struct {
+ userStream *UserStream
+
+ // Whether the stream has been closed
+ hasClosed bool
+}
+
+// NewUserStream creates a new user stream
+func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
+ return &UserStream{
+ UserID: userID,
+ timeOfLastChannel: time.Now(),
+ pos: currPos,
+ signalChannel: make(chan struct{}),
+ }
+}
+
+// GetListener returns UserStreamListener that a sync request can use to wait
+// for new updates with.
+// UserStreamListener must be closed
+func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.numWaiting++ // We decrement when UserStreamListener is closed
+
+ listener := UserStreamListener{
+ userStream: s,
+ }
+
+ // Lets be a bit paranoid here and check that Close() is being called
+ runtime.SetFinalizer(&listener, func(l *UserStreamListener) {
+ if !l.hasClosed {
+ util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener")
+ l.Close()
+ }
+ })
+
+ return listener
+}
+
+// Broadcast a new stream position for this user.
+func (s *UserStream) Broadcast(pos types.StreamPosition) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.pos = pos
+
+ close(s.signalChannel)
+
+ s.signalChannel = make(chan struct{})
+}
+
+// NumWaiting returns the number of goroutines waiting for waiting for updates.
+// Used for metrics and testing.
+func (s *UserStream) NumWaiting() uint {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ return s.numWaiting
+}
+
+// TimeOfLastNonEmpty returns the last time that the number of waiting listeners
+// was non-empty, may be time.Now() if number of waiting listeners is currently
+// non-empty.
+func (s *UserStream) TimeOfLastNonEmpty() time.Time {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.numWaiting > 0 {
+ return time.Now()
+ }
+
+ return s.timeOfLastChannel
+}
+
+// GetStreamPosition returns last stream position which the UserStream was
+// notified about
+func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
+ s.userStream.lock.Lock()
+ defer s.userStream.lock.Unlock()
+
+ return s.userStream.pos
+}
+
+// GetNotifyChannel returns a channel that is closed when there may be an
+// update for the user.
+// sincePos specifies from which point we want to be notified about. If there
+// has already been an update after sincePos we'll return a closed channel
+// immediately.
+func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
+ s.userStream.lock.Lock()
+ defer s.userStream.lock.Unlock()
+
+ if sincePos < s.userStream.pos {
+ // If the listener is behind, i.e. missed a potential update, then we
+ // want them to wake up immediately. We do this by returning a new
+ // closed stream, which returns immediately when selected.
+ closedChannel := make(chan struct{})
+ close(closedChannel)
+ return closedChannel
+ }
+
+ return s.userStream.signalChannel
+}
+
+// Close cleans up resources used
+func (s *UserStreamListener) Close() {
+ s.userStream.lock.Lock()
+ defer s.userStream.lock.Unlock()
+
+ if !s.hasClosed {
+ s.userStream.numWaiting--
+ s.userStream.timeOfLastChannel = time.Now()
+ }
+
+ s.hasClosed = true
+}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
new file mode 100644
index 00000000..2db54c3c
--- /dev/null
+++ b/syncapi/syncapi.go
@@ -0,0 +1,75 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package syncapi
+
+import (
+ "context"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/common/basecomponent"
+ "github.com/matrix-org/dendrite/roomserver/api"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ "github.com/matrix-org/dendrite/syncapi/consumers"
+ "github.com/matrix-org/dendrite/syncapi/routing"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI
+// component.
+func SetupSyncAPIComponent(
+ base *basecomponent.BaseDendrite,
+ deviceDB *devices.Database,
+ accountsDB *accounts.Database,
+ queryAPI api.RoomserverQueryAPI,
+) {
+ syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to sync db")
+ }
+
+ pos, err := syncDB.SyncStreamPosition(context.Background())
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to get stream position")
+ }
+
+ notifier := sync.NewNotifier(types.StreamPosition(pos))
+ err = notifier.Load(context.Background(), syncDB)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to start notifier")
+ }
+
+ requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
+
+ roomConsumer := consumers.NewOutputRoomEventConsumer(
+ base.Cfg, base.KafkaConsumer, notifier, syncDB, queryAPI,
+ )
+ if err = roomConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start room server consumer")
+ }
+
+ clientConsumer := consumers.NewOutputClientDataConsumer(
+ base.Cfg, base.KafkaConsumer, notifier, syncDB,
+ )
+ if err = clientConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start client data consumer")
+ }
+
+ routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
+}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
new file mode 100644
index 00000000..d0b1c38a
--- /dev/null
+++ b/syncapi/types/types.go
@@ -0,0 +1,147 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package types
+
+import (
+ "encoding/json"
+ "strconv"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// StreamPosition represents the offset in the sync stream a client is at.
+type StreamPosition int64
+
+// String implements the Stringer interface.
+func (sp StreamPosition) String() string {
+ return strconv.FormatInt(int64(sp), 10)
+}
+
+// PrevEventRef represents a reference to a previous event in a state event upgrade
+type PrevEventRef struct {
+ PrevContent json.RawMessage `json:"prev_content"`
+ ReplacesState string `json:"replaces_state"`
+ PrevSender string `json:"prev_sender"`
+}
+
+// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
+type Response struct {
+ NextBatch string `json:"next_batch"`
+ AccountData struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"account_data"`
+ Presence struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"presence"`
+ Rooms struct {
+ Join map[string]JoinResponse `json:"join"`
+ Invite map[string]InviteResponse `json:"invite"`
+ Leave map[string]LeaveResponse `json:"leave"`
+ } `json:"rooms"`
+}
+
+// NewResponse creates an empty response with initialised maps.
+func NewResponse(pos StreamPosition) *Response {
+ res := Response{}
+ // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
+ // as an integer even though (at the moment) it is.
+ res.NextBatch = pos.String()
+ // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
+ // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
+ res.Rooms.Join = make(map[string]JoinResponse)
+ res.Rooms.Invite = make(map[string]InviteResponse)
+ res.Rooms.Leave = make(map[string]LeaveResponse)
+
+ // Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
+ // TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
+ // really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
+ // This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
+ res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ res.Presence.Events = make([]gomatrixserverlib.ClientEvent, 0)
+
+ return &res
+}
+
+// IsEmpty returns true if the response is empty, i.e. used to decided whether
+// to return the response immediately to the client or to wait for more data.
+func (r *Response) IsEmpty() bool {
+ return len(r.Rooms.Join) == 0 &&
+ len(r.Rooms.Invite) == 0 &&
+ len(r.Rooms.Leave) == 0 &&
+ len(r.AccountData.Events) == 0 &&
+ len(r.Presence.Events) == 0
+}
+
+// JoinResponse represents a /sync response for a room which is under the 'join' key.
+type JoinResponse struct {
+ State struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"state"`
+ Timeline struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ Limited bool `json:"limited"`
+ PrevBatch string `json:"prev_batch"`
+ } `json:"timeline"`
+ Ephemeral struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"ephemeral"`
+ AccountData struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"account_data"`
+}
+
+// NewJoinResponse creates an empty response with initialised arrays.
+func NewJoinResponse() *JoinResponse {
+ res := JoinResponse{}
+ res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ res.Ephemeral.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ return &res
+}
+
+// InviteResponse represents a /sync response for a room which is under the 'invite' key.
+type InviteResponse struct {
+ InviteState struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"invite_state"`
+}
+
+// NewInviteResponse creates an empty response with initialised arrays.
+func NewInviteResponse() *InviteResponse {
+ res := InviteResponse{}
+ res.InviteState.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ return &res
+}
+
+// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
+type LeaveResponse struct {
+ State struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ } `json:"state"`
+ Timeline struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ Limited bool `json:"limited"`
+ PrevBatch string `json:"prev_batch"`
+ } `json:"timeline"`
+}
+
+// NewLeaveResponse creates an empty response with initialised arrays.
+func NewLeaveResponse() *LeaveResponse {
+ res := LeaveResponse{}
+ res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
+ return &res
+}