aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go9
-rw-r--r--syncapi/consumers/roomserver.go12
-rw-r--r--syncapi/consumers/typingserver.go96
-rw-r--r--syncapi/routing/routing.go2
-rw-r--r--syncapi/routing/state.go4
-rw-r--r--syncapi/storage/account_data_table.go4
-rw-r--r--syncapi/storage/output_room_events_table.go11
-rw-r--r--syncapi/storage/syncserver.go342
-rw-r--r--syncapi/sync/notifier.go68
-rw-r--r--syncapi/sync/notifier_test.go130
-rw-r--r--syncapi/sync/request.go44
-rw-r--r--syncapi/sync/requestpool.go20
-rw-r--r--syncapi/sync/userstream.go18
-rw-r--r--syncapi/syncapi.go16
-rw-r--r--syncapi/types/types.go43
15 files changed, 598 insertions, 221 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index d05a7692..f0db5642 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
@@ -29,7 +30,7 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
- db *storage.SyncServerDatabase
+ db *storage.SyncServerDatasource
notifier *sync.Notifier
}
@@ -38,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
- store *storage.SyncServerDatabase,
+ store *storage.SyncServerDatasource,
) *OutputClientDataConsumer {
consumer := common.ContinualConsumer{
@@ -78,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
- syncStreamPos, err := s.db.UpsertAccountData(
+ pduPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@@ -89,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
+ s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})
return nil
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 1866a966..e4f1ab46 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
- db *storage.SyncServerDatabase
+ db *storage.SyncServerDatasource
notifier *sync.Notifier
query api.RoomserverQueryAPI
}
@@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
- store *storage.SyncServerDatabase,
+ store *storage.SyncServerDatasource,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
@@ -126,7 +126,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}
- syncStreamPos, err := s.db.WriteEvent(
+ pduPos, err := s.db.WriteEvent(
ctx,
&ev,
addsStateEvents,
@@ -144,7 +144,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
- s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
+ s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil
}
@@ -152,7 +152,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
- syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
+ pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@@ -161,7 +161,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
- s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos)
+ s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil
}
diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go
new file mode 100644
index 00000000..5d998a18
--- /dev/null
+++ b/syncapi/consumers/typingserver.go
@@ -0,0 +1,96 @@
+// Copyright 2019 Alex Chen
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/typingserver/api"
+ log "github.com/sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputTypingEventConsumer consumes events that originated in the typing server.
+type OutputTypingEventConsumer struct {
+ typingConsumer *common.ContinualConsumer
+ db *storage.SyncServerDatasource
+ notifier *sync.Notifier
+}
+
+// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
+// Call Start() to begin consuming from the typing server.
+func NewOutputTypingEventConsumer(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store *storage.SyncServerDatasource,
+) *OutputTypingEventConsumer {
+
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+
+ s := &OutputTypingEventConsumer{
+ typingConsumer: &consumer,
+ db: store,
+ notifier: n,
+ }
+
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from typing api
+func (s *OutputTypingEventConsumer) Start() error {
+ s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
+ s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition})
+ })
+
+ return s.typingConsumer.Start()
+}
+
+func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ var output api.OutputTypingEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("typing server output log: message parse failure")
+ return nil
+ }
+
+ log.WithFields(log.Fields{
+ "room_id": output.Event.RoomID,
+ "user_id": output.Event.UserID,
+ "typing": output.Event.Typing,
+ }).Debug("received data from typing server")
+
+ var typingPos int64
+ typingEvent := output.Event
+ if typingEvent.Typing {
+ typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
+ } else {
+ typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
+ }
+
+ s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos})
+ return nil
+}
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index cbdcfb6b..0f5019fc 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -34,7 +34,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
-func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) {
+func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
authData := auth.Data{
diff --git a/syncapi/routing/state.go b/syncapi/routing/state.go
index 6b98a0b7..5571a052 100644
--- a/syncapi/routing/state.go
+++ b/syncapi/routing/state.go
@@ -40,7 +40,7 @@ type stateEventInStateResp struct {
// TODO: Check if the user is in the room. If not, check if the room's history
// is publicly visible. Current behaviour is returning an empty array if the
// user cannot see the room's history.
-func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
+func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)
@@ -84,7 +84,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r
// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
// state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404.
-func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse {
+func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)
diff --git a/syncapi/storage/account_data_table.go b/syncapi/storage/account_data_table.go
index d4d74d15..9b73ce7d 100644
--- a/syncapi/storage/account_data_table.go
+++ b/syncapi/storage/account_data_table.go
@@ -19,8 +19,6 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/common"
-
- "github.com/matrix-org/dendrite/syncapi/types"
)
const accountDataSchema = `
@@ -94,7 +92,7 @@ func (s *accountDataStatements) insertAccountData(
func (s *accountDataStatements) selectAccountDataInRange(
ctx context.Context,
userID string,
- oldPos, newPos types.StreamPosition,
+ oldPos, newPos int64,
) (data map[string][]string, err error) {
data = make(map[string][]string)
diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/output_room_events_table.go
index 035db988..06df017c 100644
--- a/syncapi/storage/output_room_events_table.go
+++ b/syncapi/storage/output_room_events_table.go
@@ -23,7 +23,6 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
- "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@@ -109,11 +108,11 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
return
}
-// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
+// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange(
- ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
) (map[string]map[string]bool, map[string]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
@@ -171,7 +170,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
eventIDToEvent[ev.EventID()] = streamEvent{
Event: ev,
- streamPosition: types.StreamPosition(streamPos),
+ streamPosition: streamPos,
}
}
@@ -223,7 +222,7 @@ func (s *outputRoomEventsStatements) insertEvent(
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, fromPos, toPos int64, limit int,
) ([]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
@@ -286,7 +285,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
result = append(result, streamEvent{
Event: ev,
- streamPosition: types.StreamPosition(streamPos),
+ streamPosition: streamPos,
transactionID: transactionID,
})
}
diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go
index b0655a0a..b4d7ccbd 100644
--- a/syncapi/storage/syncserver.go
+++ b/syncapi/storage/syncserver.go
@@ -17,7 +17,10 @@ package storage
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
+ "strconv"
+ "time"
"github.com/sirupsen/logrus"
@@ -28,6 +31,7 @@ import (
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -35,33 +39,35 @@ type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.Event
membership string
- // The stream position of the latest membership event for this user, if applicable.
+ // The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
- membershipPos types.StreamPosition
+ membershipPos int64
}
-// Same as gomatrixserverlib.Event but also has the stream position for this event.
+// Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type streamEvent struct {
gomatrixserverlib.Event
- streamPosition types.StreamPosition
+ streamPosition int64
transactionID *api.TransactionID
}
-// SyncServerDatabase represents a sync server database
-type SyncServerDatabase struct {
+// SyncServerDatabase represents a sync server datasource which manages
+// both the database for PDUs and caches for EDUs.
+type SyncServerDatasource struct {
db *sql.DB
common.PartitionOffsetStatements
accountData accountDataStatements
events outputRoomEventsStatements
roomstate currentRoomStateStatements
invites inviteEventsStatements
+ typingCache *cache.TypingCache
}
// NewSyncServerDatabase creates a new sync server database
-func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
- var d SyncServerDatabase
+func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, error) {
+ var d SyncServerDatasource
var err error
- if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ if d.db, err = sql.Open("postgres", dbDataSourceName); err != nil {
return nil, err
}
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
@@ -79,11 +85,12 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err := d.invites.prepare(d.db); err != nil {
return nil, err
}
+ d.typingCache = cache.NewTypingCache()
return &d, nil
}
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
-func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
+func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.roomstate.selectJoinedUsers(ctx)
}
@@ -92,7 +99,7 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[str
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
-func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
@@ -104,38 +111,38 @@ func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]g
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
-// when generating the stream position for this event. Returns the sync stream position for the inserted event.
+// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
-func (d *SyncServerDatabase) WriteEvent(
+func (d *SyncServerDatasource) WriteEvent(
ctx context.Context,
ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID,
-) (streamPos types.StreamPosition, returnErr error) {
+) (pduPosition int64, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error
pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
if err != nil {
return err
}
- streamPos = types.StreamPosition(pos)
+ pduPosition = pos
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event.
return nil
}
- return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, streamPos)
+ return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
})
return
}
-func (d *SyncServerDatabase) updateRoomState(
+func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
addedEvents []gomatrixserverlib.Event,
- streamPos types.StreamPosition,
+ pduPosition int64,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs {
@@ -157,7 +164,7 @@ func (d *SyncServerDatabase) updateRoomState(
}
membership = &value
}
- if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, int64(streamPos)); err != nil {
+ if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
return err
}
}
@@ -168,7 +175,7 @@ func (d *SyncServerDatabase) updateRoomState(
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
-func (d *SyncServerDatabase) GetStateEvent(
+func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
@@ -177,7 +184,7 @@ func (d *SyncServerDatabase) GetStateEvent(
// GetStateEventsForRoom fetches the state events for a given room.
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
-func (d *SyncServerDatabase) GetStateEventsForRoom(
+func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string,
) (stateEvents []gomatrixserverlib.Event, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
@@ -187,46 +194,49 @@ func (d *SyncServerDatabase) GetStateEventsForRoom(
return
}
-// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
-func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
- return d.syncStreamPositionTx(ctx, nil)
+// SyncPosition returns the latest positions for syncing.
+func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.SyncPosition, error) {
+ return d.syncPositionTx(ctx, nil)
}
-func (d *SyncServerDatabase) syncStreamPositionTx(
+func (d *SyncServerDatasource) syncPositionTx(
ctx context.Context, txn *sql.Tx,
-) (types.StreamPosition, error) {
- maxID, err := d.events.selectMaxEventID(ctx, txn)
+) (sp types.SyncPosition, err error) {
+
+ maxEventID, err := d.events.selectMaxEventID(ctx, txn)
if err != nil {
- return 0, err
+ return sp, err
}
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
if err != nil {
- return 0, err
+ return sp, err
}
- if maxAccountDataID > maxID {
- maxID = maxAccountDataID
+ if maxAccountDataID > maxEventID {
+ maxEventID = maxAccountDataID
}
maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn)
if err != nil {
- return 0, err
+ return sp, err
}
- if maxInviteID > maxID {
- maxID = maxInviteID
+ if maxInviteID > maxEventID {
+ maxEventID = maxInviteID
}
- return types.StreamPosition(maxID), nil
+ sp.PDUPosition = maxEventID
+
+ sp.TypingPosition = d.typingCache.GetLatestSyncPosition()
+
+ return
}
-// IncrementalSync returns all the data needed in order to create an incremental
-// sync response for the given user. Events returned will include any client
-// transaction IDs associated with the given device. These transaction IDs come
-// from when the device sent the event via an API that included a transaction
-// ID.
-func (d *SyncServerDatabase) IncrementalSync(
+// addPDUDeltaToResponse adds all PDU deltas to a sync response.
+// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
+func (d *SyncServerDatasource) addPDUDeltaToResponse(
ctx context.Context,
device authtypes.Device,
- fromPos, toPos types.StreamPosition,
+ fromPos, toPos int64,
numRecentEventsPerRoom int,
-) (*types.Response, error) {
+ res *types.Response,
+) ([]string, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return nil, err
@@ -235,7 +245,7 @@ func (d *SyncServerDatabase) IncrementalSync(
defer common.EndTransaction(txn, &succeeded)
// Work out which rooms to return in the response. This is done by getting not only the currently
- // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
+ // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
@@ -243,8 +253,9 @@ func (d *SyncServerDatabase) IncrementalSync(
return nil, err
}
- res := types.NewResponse(toPos)
+ joinedRoomIDs := make([]string, 0, len(deltas))
for _, delta := range deltas {
+ joinedRoomIDs = append(joinedRoomIDs, delta.roomID)
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
if err != nil {
return nil, err
@@ -257,52 +268,151 @@ func (d *SyncServerDatabase) IncrementalSync(
}
succeeded = true
- return res, nil
+ return joinedRoomIDs, nil
}
-// CompleteSync a complete /sync API response for the given user.
-func (d *SyncServerDatabase) CompleteSync(
- ctx context.Context, userID string, numRecentEventsPerRoom int,
+// addTypingDeltaToResponse adds all typing notifications to a sync response
+// since the specified position.
+func (d *SyncServerDatasource) addTypingDeltaToResponse(
+ since int64,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ var jr types.JoinResponse
+ var ok bool
+ var err error
+ for _, roomID := range joinedRoomIDs {
+ if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter(
+ roomID, since,
+ ); updated {
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MTyping,
+ }
+ ev.Content, err = json.Marshal(map[string]interface{}{
+ "user_ids": typingUsers,
+ })
+ if err != nil {
+ return err
+ }
+
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = *types.NewJoinResponse()
+ }
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+ }
+ return nil
+}
+
+// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
+// the positions of that type are not equal in fromPos and toPos.
+func (d *SyncServerDatasource) addEDUDeltaToResponse(
+ fromPos, toPos types.SyncPosition,
+ joinedRoomIDs []string,
+ res *types.Response,
+) (err error) {
+
+ if fromPos.TypingPosition != toPos.TypingPosition {
+ err = d.addTypingDeltaToResponse(
+ fromPos.TypingPosition, joinedRoomIDs, res,
+ )
+ }
+
+ return
+}
+
+// IncrementalSync returns all the data needed in order to create an incremental
+// sync response for the given user. Events returned will include any client
+// transaction IDs associated with the given device. These transaction IDs come
+// from when the device sent the event via an API that included a transaction
+// ID.
+func (d *SyncServerDatasource) IncrementalSync(
+ ctx context.Context,
+ device authtypes.Device,
+ fromPos, toPos types.SyncPosition,
+ numRecentEventsPerRoom int,
) (*types.Response, error) {
+ nextBatchPos := fromPos.WithUpdates(toPos)
+ res := types.NewResponse(nextBatchPos)
+
+ var joinedRoomIDs []string
+ var err error
+ if fromPos.PDUPosition != toPos.PDUPosition {
+ joinedRoomIDs, err = d.addPDUDeltaToResponse(
+ ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res,
+ )
+ } else {
+ joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
+ ctx, nil, device.UserID, "join",
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ err = d.addEDUDeltaToResponse(
+ fromPos, toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
+// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
+func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
+ ctx context.Context,
+ userID string,
+ numRecentEventsPerRoom int,
+) (
+ res *types.Response,
+ toPos types.SyncPosition,
+ joinedRoomIDs []string,
+ err error,
+) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
- // a consistent view of the database throughout. This includes extracting the sync stream position.
+ // a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
- return nil, err
+ return
}
var succeeded bool
defer common.EndTransaction(txn, &succeeded)
- // Get the current stream position which we will base the sync response on.
- pos, err := d.syncStreamPositionTx(ctx, txn)
+ // Get the current sync position which we will base the sync response on.
+ toPos, err = d.syncPositionTx(ctx, txn)
if err != nil {
- return nil, err
+ return
}
+ res = types.NewResponse(toPos)
+
// Extract room state and recent events for all rooms the user is joined to.
- roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
+ joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil {
- return nil, err
+ return
}
// Build up a /sync response. Add joined rooms.
- res := types.NewResponse(pos)
- for _, roomID := range roomIDs {
+ for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
- return nil, err
+ return
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
+ ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom,
)
if err != nil {
- return nil, err
+ return
}
// We don't include a device here as we don't need to send down
@@ -311,10 +421,12 @@ func (d *SyncServerDatabase) CompleteSync(
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
- if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
- jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
- jr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = "1"
}
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
@@ -322,12 +434,34 @@ func (d *SyncServerDatabase) CompleteSync(
res.Rooms.Join[roomID] = *jr
}
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil {
- return nil, err
+ if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil {
+ return
}
succeeded = true
- return res, err
+ return res, toPos, joinedRoomIDs, err
+}
+
+// CompleteSync returns a complete /sync API response for the given user.
+func (d *SyncServerDatasource) CompleteSync(
+ ctx context.Context, userID string, numRecentEventsPerRoom int,
+) (*types.Response, error) {
+ res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
+ ctx, userID, numRecentEventsPerRoom,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Use a zero value SyncPosition for fromPos so all EDU states are added.
+ err = d.addEDUDeltaToResponse(
+ types.SyncPosition{}, toPos, joinedRoomIDs, res,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
}
var txReadOnlySnapshot = sql.TxOptions{
@@ -345,8 +479,8 @@ var txReadOnlySnapshot = sql.TxOptions{
// Returns a map following the format data[roomID] = []dataTypes
// If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error
-func (d *SyncServerDatabase) GetAccountDataInRange(
- ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
+func (d *SyncServerDatasource) GetAccountDataInRange(
+ ctx context.Context, userID string, oldPos, newPos int64,
) (map[string][]string, error) {
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos)
}
@@ -357,26 +491,24 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
// If no data with the given type, user ID and room ID exists in the database,
// creates a new row, else update the existing one
// Returns an error if there was an issue with the upsert
-func (d *SyncServerDatabase) UpsertAccountData(
+func (d *SyncServerDatasource) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string,
-) (types.StreamPosition, error) {
- pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
- return types.StreamPosition(pos), err
+) (int64, error) {
+ return d.accountData.insertAccountData(ctx, userID, roomID, dataType)
}
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
-func (d *SyncServerDatabase) AddInviteEvent(
+func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
-) (types.StreamPosition, error) {
- pos, err := d.invites.insertInviteEvent(ctx, inviteEvent)
- return types.StreamPosition(pos), err
+) (int64, error) {
+ return d.invites.insertInviteEvent(ctx, inviteEvent)
}
// RetireInviteEvent removes an old invite event from the database.
// Returns an error if there was a problem communicating with the database.
-func (d *SyncServerDatabase) RetireInviteEvent(
+func (d *SyncServerDatasource) RetireInviteEvent(
ctx context.Context, inviteEventID string,
) error {
// TODO: Record that invite has been retired in a stream so that we can
@@ -385,10 +517,30 @@ func (d *SyncServerDatabase) RetireInviteEvent(
return err
}
-func (d *SyncServerDatabase) addInvitesToResponse(
+func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
+ d.typingCache.SetTimeoutCallback(fn)
+}
+
+// AddTypingUser adds a typing user to the typing cache.
+// Returns the newly calculated sync position for typing notifications.
+func (d *SyncServerDatasource) AddTypingUser(
+ userID, roomID string, expireTime *time.Time,
+) int64 {
+ return d.typingCache.AddTypingUser(userID, roomID, expireTime)
+}
+
+// RemoveTypingUser removes a typing user from the typing cache.
+// Returns the newly calculated sync position for typing notifications.
+func (d *SyncServerDatasource) RemoveTypingUser(
+ userID, roomID string,
+) int64 {
+ return d.typingCache.RemoveUser(userID, roomID)
+}
+
+func (d *SyncServerDatasource) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
- fromPos, toPos types.StreamPosition,
+ fromPos, toPos int64,
res *types.Response,
) error {
invites, err := d.invites.selectInviteEventsInRange(
@@ -409,11 +561,11 @@ func (d *SyncServerDatabase) addInvitesToResponse(
}
// addRoomDeltaToResponse adds a room state delta to a sync response
-func (d *SyncServerDatabase) addRoomDeltaToResponse(
+func (d *SyncServerDatasource) addRoomDeltaToResponse(
ctx context.Context,
device *authtypes.Device,
txn *sql.Tx,
- fromPos, toPos types.StreamPosition,
+ fromPos, toPos int64,
delta stateDelta,
numRecentEventsPerRoom int,
res *types.Response,
@@ -445,10 +597,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
- if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
- jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
- jr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ // Use the short form of batch token for prev_batch
+ jr.Timeline.PrevBatch = "1"
}
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@@ -460,10 +614,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
- if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
- lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
+ if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
+ // Use the short form of batch token for prev_batch
+ lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
- lr.Timeline.PrevBatch = types.StreamPosition(1).String()
+ // Use the short form of batch token for prev_batch
+ lr.Timeline.PrevBatch = "1"
}
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@@ -476,7 +632,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
// Returns a map of room ID to list of events.
-func (d *SyncServerDatabase) fetchStateEvents(
+func (d *SyncServerDatasource) fetchStateEvents(
ctx context.Context, txn *sql.Tx,
roomIDToEventIDSet map[string]map[string]bool,
eventIDToEvent map[string]streamEvent,
@@ -521,7 +677,7 @@ func (d *SyncServerDatabase) fetchStateEvents(
return stateBetween, nil
}
-func (d *SyncServerDatabase) fetchMissingStateEvents(
+func (d *SyncServerDatasource) fetchMissingStateEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]streamEvent, error) {
// Fetch from the events table first so we pick up the stream ID for the
@@ -560,9 +716,9 @@ func (d *SyncServerDatabase) fetchMissingStateEvents(
return events, nil
}
-func (d *SyncServerDatabase) getStateDeltas(
+func (d *SyncServerDatasource) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
+ fromPos, toPos int64, userID string,
) ([]stateDelta, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
@@ -601,7 +757,7 @@ func (d *SyncServerDatabase) getStateDeltas(
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
- s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)}
+ s[i] = streamEvent{Event: allState[i], streamPosition: 0}
}
state[roomID] = s
continue // we'll add this room in when we do joined rooms
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 5ed701d8..30ac3a2e 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -26,7 +26,7 @@ import (
)
// Notifier will wake up sleeping requests when there is some new data.
-// It does not tell requests what that data is, only the stream position which
+// It does not tell requests what that data is, only the sync position which
// they can use to get at it. This is done to prevent races whereby we tell the caller
// the event, but the token has already advanced by the time they fetch it, resulting
// in missed events.
@@ -35,18 +35,18 @@ type Notifier struct {
roomIDToJoinedUsers map[string]userIDSet
// Protects currPos and userStreams.
streamLock *sync.Mutex
- // The latest sync stream position
- currPos types.StreamPosition
+ // The latest sync position
+ currPos types.SyncPosition
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
userStreams map[string]*UserStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
}
-// NewNotifier creates a new notifier set to the given stream position.
+// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
-func NewNotifier(pos types.StreamPosition) *Notifier {
+func NewNotifier(pos types.SyncPosition) *Notifier {
return &Notifier{
currPos: pos,
roomIDToJoinedUsers: make(map[string]userIDSet),
@@ -58,20 +58,30 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
-// current position in the stream incorrectly.
-// Can be called either with a *gomatrixserverlib.Event, or with an user ID
-func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
+// current sync position incorrectly.
+// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
+// (based on the users in the event's room),
+// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
+// posUpdate contains the latest position(s) for one or more types of events.
+// If a position in posUpdate is 0, it means no updates are available of that type.
+// Typically a consumer supplies a posUpdate with the latest sync position for the
+// event type it handles, leaving other fields as 0.
+func (n *Notifier) OnNewEvent(
+ ev *gomatrixserverlib.Event, roomID string, userIDs []string,
+ posUpdate types.SyncPosition,
+) {
// update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
- n.currPos = pos
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
n.removeEmptyUserStreams()
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
- userIDs := n.joinedUsers(ev.RoomID())
+ usersToNotify := n.joinedUsers(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
targetUserID := *ev.StateKey()
@@ -84,11 +94,11 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
// Keep the joined user map up-to-date
switch membership {
case "invite":
- userIDs = append(userIDs, targetUserID)
+ usersToNotify = append(usersToNotify, targetUserID)
case "join":
// Manually append the new user's ID so they get notified
// along all members in the room
- userIDs = append(userIDs, targetUserID)
+ usersToNotify = append(usersToNotify, targetUserID)
n.addJoinedUser(ev.RoomID(), targetUserID)
case "leave":
fallthrough
@@ -98,11 +108,15 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
}
}
- for _, toNotifyUserID := range userIDs {
- n.wakeupUser(toNotifyUserID, pos)
- }
- } else if len(userID) > 0 {
- n.wakeupUser(userID, pos)
+ n.wakeupUsers(usersToNotify, latestPos)
+ } else if roomID != "" {
+ n.wakeupUsers(n.joinedUsers(roomID), latestPos)
+ } else if len(userIDs) > 0 {
+ n.wakeupUsers(userIDs, latestPos)
+ } else {
+ log.WithFields(log.Fields{
+ "posUpdate": posUpdate.String,
+ }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
}
}
@@ -127,7 +141,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
}
// Load the membership states required to notify users correctly.
-func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) error {
+func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) error {
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err
@@ -136,8 +150,11 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) err
return nil
}
-// CurrentPosition returns the current stream position
-func (n *Notifier) CurrentPosition() types.StreamPosition {
+// CurrentPosition returns the current sync position
+func (n *Notifier) CurrentPosition() types.SyncPosition {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
return n.currPos
}
@@ -156,12 +173,13 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
}
}
-func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
- stream := n.fetchUserStream(userID, false)
- if stream == nil {
- return
+func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
+ for _, userID := range userIDs {
+ stream := n.fetchUserStream(userID, false)
+ if stream != nil {
+ stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
+ }
}
- stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
}
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
index 4fa54393..904315e9 100644
--- a/syncapi/sync/notifier_test.go
+++ b/syncapi/sync/notifier_test.go
@@ -32,19 +32,40 @@ var (
randomMessageEvent gomatrixserverlib.Event
aliceInviteBobEvent gomatrixserverlib.Event
bobLeaveEvent gomatrixserverlib.Event
+ syncPositionVeryOld types.SyncPosition
+ syncPositionBefore types.SyncPosition
+ syncPositionAfter types.SyncPosition
+ syncPositionNewEDU types.SyncPosition
+ syncPositionAfter2 types.SyncPosition
)
var (
- streamPositionVeryOld = types.StreamPosition(5)
- streamPositionBefore = types.StreamPosition(11)
- streamPositionAfter = types.StreamPosition(12)
- streamPositionAfter2 = types.StreamPosition(13)
- roomID = "!test:localhost"
- alice = "@alice:localhost"
- bob = "@bob:localhost"
+ roomID = "!test:localhost"
+ alice = "@alice:localhost"
+ bob = "@bob:localhost"
)
func init() {
+ baseSyncPos := types.SyncPosition{
+ PDUPosition: 0,
+ TypingPosition: 0,
+ }
+
+ syncPositionVeryOld = baseSyncPos
+ syncPositionVeryOld.PDUPosition = 5
+
+ syncPositionBefore = baseSyncPos
+ syncPositionBefore.PDUPosition = 11
+
+ syncPositionAfter = baseSyncPos
+ syncPositionAfter.PDUPosition = 12
+
+ syncPositionNewEDU = syncPositionAfter
+ syncPositionNewEDU.TypingPosition = 1
+
+ syncPositionAfter2 = baseSyncPos
+ syncPositionAfter2.PDUPosition = 13
+
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.message",
@@ -92,19 +113,19 @@ func init() {
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
- pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
+ n := NewNotifier(syncPositionBefore)
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
- if pos != streamPositionBefore {
- t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
+ if pos != syncPositionBefore {
+ t.Fatalf("TestImmediateNotification want %d, got %d", syncPositionBefore, pos)
}
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -112,12 +133,12 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}()
@@ -125,14 +146,42 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
+ if err != nil {
+ t.Errorf("TestNewInviteEventForUser error: %s", err)
+ }
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionAfter, pos)
+ }
+ wg.Done()
+ }()
+
+ stream := n.fetchUserStream(bob, true)
+ waitForBlocking(stream, 1)
+
+ n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
+
+ wg.Wait()
+}
+
+// Test an EDU-only update wakes up the request.
+func TestEDUWakeup(t *testing.T) {
+ n := NewNotifier(syncPositionAfter)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -140,12 +189,12 @@ func TestNewInviteEventForUser(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionNewEDU {
+ t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionNewEDU, pos)
}
wg.Done()
}()
@@ -153,14 +202,14 @@ func TestNewInviteEventForUser(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
+ n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
wg.Wait()
}
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -168,12 +217,12 @@ func TestMultipleRequestWakeup(t *testing.T) {
var wg sync.WaitGroup
wg.Add(3)
poll := func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestMultipleRequestWakeup want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}
@@ -184,7 +233,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 3)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
@@ -198,7 +247,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -208,18 +257,18 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// Make bob leave the room
leaveWG.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
leaveWG.Done()
}()
bobStream := n.fetchUserStream(bob, true)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
+ n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
@@ -227,19 +276,19 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
aliceStream := n.fetchUserStream(alice, true)
aliceWG.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter2 {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
+ if pos != syncPositionAfter2 {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter2, pos)
}
aliceWG.Done()
}()
go func() {
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
- _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
+ _, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err == nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
}
@@ -248,7 +297,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
@@ -256,18 +305,17 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
-// same as Notifier.WaitForEvents but with a timeout.
-func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
+func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
- return types.StreamPosition(0), fmt.Errorf(
+ return types.SyncPosition{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
)
case <-listener.GetNotifyChannel(*req.since):
- p := listener.GetStreamPosition()
+ p := listener.GetSyncPosition()
return p, nil
}
}
@@ -280,7 +328,7 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
}
}
-func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
+func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
return syncRequest{
device: authtypes.Device{UserID: userID},
timeout: 1 * time.Minute,
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
index 35a15f6f..a5d2f60f 100644
--- a/syncapi/sync/request.go
+++ b/syncapi/sync/request.go
@@ -16,8 +16,10 @@ package sync
import (
"context"
+ "errors"
"net/http"
"strconv"
+ "strings"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@@ -36,7 +38,7 @@ type syncRequest struct {
device authtypes.Device
limit int
timeout time.Duration
- since *types.StreamPosition // nil means that no since token was supplied
+ since *types.SyncPosition // nil means that no since token was supplied
wantFullState bool
log *log.Entry
}
@@ -73,15 +75,41 @@ func getTimeout(timeoutMS string) time.Duration {
}
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
-// stream position. If the string is empty then (nil, nil) is returned.
-func getSyncStreamPosition(since string) (*types.StreamPosition, error) {
+// types.SyncPosition. If the string is empty then (nil, nil) is returned.
+// There are two forms of tokens: The full length form containing all PDU and EDU
+// positions separated by "_", and the short form containing only the PDU
+// position. Short form can be used for, e.g., `prev_batch` tokens.
+func getSyncStreamPosition(since string) (*types.SyncPosition, error) {
if since == "" {
return nil, nil
}
- i, err := strconv.Atoi(since)
- if err != nil {
- return nil, err
+
+ posStrings := strings.Split(since, "_")
+ if len(posStrings) != 2 && len(posStrings) != 1 {
+ // A token can either be full length or short (PDU-only).
+ return nil, errors.New("malformed batch token")
+ }
+
+ positions := make([]int64, len(posStrings))
+ for i, posString := range posStrings {
+ pos, err := strconv.ParseInt(posString, 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ positions[i] = pos
+ }
+
+ if len(positions) == 2 {
+ // Full length token; construct SyncPosition with every entry in
+ // `positions`. These entries must have the same order with the fields
+ // in struct SyncPosition, so we disable the govet check below.
+ return &types.SyncPosition{ //nolint:govet
+ positions[0], positions[1],
+ }, nil
+ } else {
+ // Token with PDU position only
+ return &types.SyncPosition{
+ PDUPosition: positions[0],
+ }, nil
}
- token := types.StreamPosition(i)
- return &token, nil
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 89137eb5..a6ec6bd9 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -31,13 +31,13 @@ import (
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
- db *storage.SyncServerDatabase
+ db *storage.SyncServerDatasource
accountDB *accounts.Database
notifier *Notifier
}
// NewRequestPool makes a new RequestPool
-func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool {
+func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts.Database) *RequestPool {
return &RequestPool{db, adb, n}
}
@@ -92,11 +92,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
+ sincePos := *syncReq.since
for {
select {
// Wait for notifier to wake us up
- case <-userStreamListener.GetNotifyChannel(currPos):
- currPos = userStreamListener.GetStreamPosition()
+ case <-userStreamListener.GetNotifyChannel(sincePos):
+ currPos = userStreamListener.GetSyncPosition()
+ sincePos = currPos
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
@@ -128,24 +130,24 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
}
}
-func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
+func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncPosition) (res *types.Response, err error) {
// TODO: handle ignored users
if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
} else {
- res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit)
+ res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit)
}
if err != nil {
return
}
- res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos)
+ res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition)
return
}
func (rp *RequestPool) appendAccountData(
- data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
+ data *types.Response, userID string, req syncRequest, currentPos int64,
) (*types.Response, error) {
// TODO: Account data doesn't have a sync position of its own, meaning that
// account data might be sent multiple time to the client if multiple account
@@ -179,7 +181,7 @@ func (rp *RequestPool) appendAccountData(
}
// Sync is not initial, get all account data since the latest sync
- dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
+ dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since.PDUPosition, currentPos)
if err != nil {
return nil, err
}
diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go
index 77d09c20..beb10e48 100644
--- a/syncapi/sync/userstream.go
+++ b/syncapi/sync/userstream.go
@@ -34,8 +34,8 @@ type UserStream struct {
lock sync.Mutex
// Closed when there is an update.
signalChannel chan struct{}
- // The last stream position that there may have been an update for the suser
- pos types.StreamPosition
+ // The last sync position that there may have been an update for the user
+ pos types.SyncPosition
// The last time when we had some listeners waiting
timeOfLastChannel time.Time
// The number of listeners waiting
@@ -51,7 +51,7 @@ type UserStreamListener struct {
}
// NewUserStream creates a new user stream
-func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
+func NewUserStream(userID string, currPos types.SyncPosition) *UserStream {
return &UserStream{
UserID: userID,
timeOfLastChannel: time.Now(),
@@ -84,8 +84,8 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
return listener
}
-// Broadcast a new stream position for this user.
-func (s *UserStream) Broadcast(pos types.StreamPosition) {
+// Broadcast a new sync position for this user.
+func (s *UserStream) Broadcast(pos types.SyncPosition) {
s.lock.Lock()
defer s.lock.Unlock()
@@ -118,9 +118,9 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
return s.timeOfLastChannel
}
-// GetStreamPosition returns last stream position which the UserStream was
+// GetStreamPosition returns last sync position which the UserStream was
// notified about
-func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
+func (s *UserStreamListener) GetSyncPosition() types.SyncPosition {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
@@ -132,11 +132,11 @@ func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
// sincePos specifies from which point we want to be notified about. If there
// has already been an update after sincePos we'll return a closed channel
// immediately.
-func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
+func (s *UserStreamListener) GetNotifyChannel(sincePos types.SyncPosition) <-chan struct{} {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
- if sincePos < s.userStream.pos {
+ if s.userStream.pos.IsAfter(sincePos) {
// If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected.
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 2db54c3c..4738feea 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -28,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
- "github.com/matrix-org/dendrite/syncapi/types"
)
// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI
@@ -39,17 +38,17 @@ func SetupSyncAPIComponent(
accountsDB *accounts.Database,
queryAPI api.RoomserverQueryAPI,
) {
- syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI))
+ syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
- pos, err := syncDB.SyncStreamPosition(context.Background())
+ pos, err := syncDB.SyncPosition(context.Background())
if err != nil {
- logrus.WithError(err).Panicf("failed to get stream position")
+ logrus.WithError(err).Panicf("failed to get sync position")
}
- notifier := sync.NewNotifier(types.StreamPosition(pos))
+ notifier := sync.NewNotifier(pos)
err = notifier.Load(context.Background(), syncDB)
if err != nil {
logrus.WithError(err).Panicf("failed to start notifier")
@@ -71,5 +70,12 @@ func SetupSyncAPIComponent(
logrus.WithError(err).Panicf("failed to start client data consumer")
}
+ typingConsumer := consumers.NewOutputTypingEventConsumer(
+ base.Cfg, base.KafkaConsumer, notifier, syncDB,
+ )
+ if err = typingConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start typing server consumer")
+ }
+
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index d0b1c38a..af7ec865 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -21,12 +21,38 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
-// StreamPosition represents the offset in the sync stream a client is at.
-type StreamPosition int64
+// SyncPosition contains the PDU and EDU stream sync positions for a client.
+type SyncPosition struct {
+ // PDUPosition is the stream position for PDUs the client is at.
+ PDUPosition int64
+ // TypingPosition is the client's position for typing notifications.
+ TypingPosition int64
+}
// String implements the Stringer interface.
-func (sp StreamPosition) String() string {
- return strconv.FormatInt(int64(sp), 10)
+func (sp SyncPosition) String() string {
+ return strconv.FormatInt(sp.PDUPosition, 10) + "_" +
+ strconv.FormatInt(sp.TypingPosition, 10)
+}
+
+// IsAfter returns whether one SyncPosition refers to states newer than another SyncPosition.
+func (sp SyncPosition) IsAfter(other SyncPosition) bool {
+ return sp.PDUPosition > other.PDUPosition ||
+ sp.TypingPosition > other.TypingPosition
+}
+
+// WithUpdates returns a copy of the SyncPosition with updates applied from another SyncPosition.
+// If the latter SyncPosition contains a field that is not 0, it is considered an update,
+// and its value will replace the corresponding value in the SyncPosition on which WithUpdates is called.
+func (sp SyncPosition) WithUpdates(other SyncPosition) SyncPosition {
+ ret := sp
+ if other.PDUPosition != 0 {
+ ret.PDUPosition = other.PDUPosition
+ }
+ if other.TypingPosition != 0 {
+ ret.TypingPosition = other.TypingPosition
+ }
+ return ret
}
// PrevEventRef represents a reference to a previous event in a state event upgrade
@@ -53,11 +79,10 @@ type Response struct {
}
// NewResponse creates an empty response with initialised maps.
-func NewResponse(pos StreamPosition) *Response {
- res := Response{}
- // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
- // as an integer even though (at the moment) it is.
- res.NextBatch = pos.String()
+func NewResponse(pos SyncPosition) *Response {
+ res := Response{
+ NextBatch: pos.String(),
+ }
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse)