aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go16
-rw-r--r--syncapi/consumers/eduserver_receipts.go17
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go12
-rw-r--r--syncapi/consumers/eduserver_typing.go33
-rw-r--r--syncapi/consumers/keychange.go24
-rw-r--r--syncapi/consumers/roomserver.go61
6 files changed, 102 insertions, 61 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 9883c6b0..4958f221 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@@ -32,15 +32,17 @@ import (
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
- notifier *sync.Notifier
+ stream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
@@ -52,7 +54,8 @@ func NewOutputClientDataConsumer(
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
- notifier: n,
+ notifier: notifier,
+ stream: stream,
}
consumer.ProcessMessage = s.onMessage
@@ -81,7 +84,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
- pduPos, err := s.db.UpsertAccountData(
+ streamPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@@ -92,7 +95,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
+ s.stream.Advance(streamPos)
+ s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil
}
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 88334b65..bd538eff 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -18,14 +18,13 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/syncapi/types"
-
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@@ -33,7 +32,8 @@ import (
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
- notifier *sync.Notifier
+ stream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@@ -41,8 +41,9 @@ type OutputReceiptEventConsumer struct {
func NewOutputReceiptEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
@@ -55,7 +56,8 @@ func NewOutputReceiptEventConsumer(
s := &OutputReceiptEventConsumer{
receiptConsumer: &consumer,
db: store,
- notifier: n,
+ notifier: notifier,
+ stream: stream,
}
consumer.ProcessMessage = s.onMessage
@@ -87,7 +89,8 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
if err != nil {
return err
}
- // update stream position
+
+ s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index a375baf8..6e774b5b 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -35,7 +35,8 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
- notifier *sync.Notifier
+ stream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@@ -43,8 +44,9 @@ type OutputSendToDeviceEventConsumer struct {
func NewOutputSendToDeviceEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@@ -58,7 +60,8 @@ func NewOutputSendToDeviceEventConsumer(
sendToDeviceConsumer: &consumer,
db: store,
serverName: cfg.Matrix.ServerName,
- notifier: n,
+ notifier: notifier,
+ stream: stream,
}
consumer.ProcessMessage = s.onMessage
@@ -102,6 +105,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
+ s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 28574b50..3edf6675 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -19,10 +19,11 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@@ -30,8 +31,9 @@ import (
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *sync.Notifier
+ eduCache *cache.EDUCache
+ stream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@@ -39,8 +41,10 @@ type OutputTypingEventConsumer struct {
func NewOutputTypingEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ eduCache *cache.EDUCache,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
@@ -52,8 +56,9 @@ func NewOutputTypingEventConsumer(
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
- db: store,
- notifier: n,
+ eduCache: eduCache,
+ notifier: notifier,
+ stream: stream,
}
consumer.ProcessMessage = s.onMessage
@@ -63,10 +68,10 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
- s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
- s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
+ s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
+ pos := types.StreamPosition(latestSyncPosition)
+ s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: pos})
})
-
return s.typingConsumer.Start()
}
@@ -87,11 +92,17 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
- typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
+ typingPos = types.StreamPosition(
+ s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
+ )
} else {
- typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
+ typingPos = types.StreamPosition(
+ s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
+ )
}
+ s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
+
return nil
}
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 59cd583d..af7b280f 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@@ -34,12 +34,13 @@ import (
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
+ notifier *notifier.Notifier
+ stream types.PartitionedStreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
- notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@@ -48,10 +49,11 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
- n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.PartitionedStreamProvider,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@@ -69,7 +71,8 @@ func NewOutputKeyChangeEventConsumer(
rsAPI: rsAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
- notifier: n,
+ notifier: notifier,
+ stream: stream,
}
consumer.ProcessMessage = s.onMessage
@@ -114,14 +117,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
- posUpdate := types.StreamingToken{
- DeviceListPosition: types.LogPosition{
- Offset: msg.Offset,
- Partition: msg.Partition,
- },
+ posUpdate := types.LogPosition{
+ Offset: msg.Offset,
+ Partition: msg.Partition,
}
+
+ s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
- s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
+ s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
+
return nil
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 399f67ba..1d47b73a 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@@ -32,19 +32,23 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.SyncAPI
- rsAPI api.RoomserverInternalAPI
- rsConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *sync.Notifier
+ cfg *config.SyncAPI
+ rsAPI api.RoomserverInternalAPI
+ rsConsumer *internal.ContinualConsumer
+ db storage.Database
+ pduStream types.StreamProvider
+ inviteStream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ notifier *notifier.Notifier,
+ pduStream types.StreamProvider,
+ inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
@@ -55,11 +59,13 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
- cfg: cfg,
- rsConsumer: &consumer,
- db: store,
- notifier: n,
- rsAPI: rsAPI,
+ cfg: cfg,
+ rsConsumer: &consumer,
+ db: store,
+ notifier: notifier,
+ pduStream: pduStream,
+ inviteStream: inviteStream,
+ rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@@ -180,7 +186,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
+ s.pduStream.Advance(pduPos)
+ s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -219,7 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
+ s.pduStream.Advance(pduPos)
+ s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -274,7 +282,10 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
+
+ s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
+
return nil
}
@@ -290,9 +301,11 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}).Panicf("roomserver output log: remove invite failure")
return nil
}
+
// Notify any active sync requests that the invite has been retired.
- // Invites share the same stream counter as PDUs
+ s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
+
return nil
}
@@ -307,12 +320,13 @@ func (s *OutputRoomEventConsumer) onNewPeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
+
// tell the notifier about the new peek so it knows to wake up new devices
- s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
+ // TODO: This only works because the peeks table is reusing the same
+ // index as PDUs, but we should fix this
+ s.pduStream.Advance(sp)
+ s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
- // we need to wake up the users who might need to now be peeking into this room,
- // so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}
@@ -327,12 +341,13 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
+
// tell the notifier about the new peek so it knows to wake up new devices
- s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
+ // TODO: This only works because the peeks table is reusing the same
+ // index as PDUs, but we should fix this
+ s.pduStream.Advance(sp)
+ s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
- // we need to wake up the users who might need to now be peeking into this room,
- // so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}