aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go5
-rw-r--r--syncapi/consumers/keychange.go5
-rw-r--r--syncapi/consumers/presence.go5
-rw-r--r--syncapi/consumers/receipts.go5
-rw-r--r--syncapi/consumers/roomserver.go19
-rw-r--r--syncapi/consumers/sendtodevice.go5
-rw-r--r--syncapi/consumers/typing.go5
-rw-r--r--syncapi/consumers/userapi.go5
8 files changed, 34 insertions, 20 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 796cc61e..735f6718 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -34,6 +34,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -46,7 +47,7 @@ type OutputClientDataConsumer struct {
topic string
topicReIndex string
db storage.Database
- stream types.StreamProvider
+ stream streams.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
fts *fulltext.Search
@@ -61,7 +62,7 @@ func NewOutputClientDataConsumer(
nats *nats.Conn,
store storage.Database,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
fts *fulltext.Search,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index c42e7197..dc7d9e20 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
@@ -40,7 +41,7 @@ type OutputKeyChangeEventConsumer struct {
topic string
db storage.Database
notifier *notifier.Notifier
- stream types.StreamProvider
+ stream streams.StreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.SyncRoomserverAPI
}
@@ -55,7 +56,7 @@ func NewOutputKeyChangeEventConsumer(
rsAPI roomserverAPI.SyncRoomserverAPI,
store storage.Database,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
) *OutputKeyChangeEventConsumer {
s := &OutputKeyChangeEventConsumer{
ctx: process.Context(),
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
index 61bdc13d..145059c2 100644
--- a/syncapi/consumers/presence.go
+++ b/syncapi/consumers/presence.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -39,7 +40,7 @@ type PresenceConsumer struct {
requestTopic string
presenceTopic string
db storage.Database
- stream types.StreamProvider
+ stream streams.StreamProvider
notifier *notifier.Notifier
deviceAPI api.SyncUserAPI
cfg *config.SyncAPI
@@ -54,7 +55,7 @@ func NewPresenceConsumer(
nats *nats.Conn,
db storage.Database,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
deviceAPI api.SyncUserAPI,
) *PresenceConsumer {
return &PresenceConsumer{
diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go
index 4379dd13..8aaa6573 100644
--- a/syncapi/consumers/receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -38,7 +39,7 @@ type OutputReceiptEventConsumer struct {
durable string
topic string
db storage.Database
- stream types.StreamProvider
+ stream streams.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
}
@@ -51,7 +52,7 @@ func NewOutputReceiptEventConsumer(
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 3756ad75..e5e8fe29 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -45,8 +46,8 @@ type OutputRoomEventConsumer struct {
durable string
topic string
db storage.Database
- pduStream types.StreamProvider
- inviteStream types.StreamProvider
+ pduStream streams.StreamProvider
+ inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts *fulltext.Search
}
@@ -58,8 +59,8 @@ func NewOutputRoomEventConsumer(
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
- pduStream types.StreamProvider,
- inviteStream types.StreamProvider,
+ pduStream streams.StreamProvider,
+ inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
) *OutputRoomEventConsumer {
@@ -449,8 +450,14 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
}
stateKey := *event.StateKey()
- prevEvent, err := s.db.GetStateEvent(
- context.TODO(), event.RoomID(), event.Type(), stateKey,
+ snapshot, err := s.db.NewDatabaseSnapshot(s.ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer snapshot.Rollback() // nolint:errcheck
+
+ prevEvent, err := snapshot.GetStateEvent(
+ s.ctx, event.RoomID(), event.Type(), stateKey,
)
if err != nil {
return event, err
diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go
index c0b43225..49d84cca 100644
--- a/syncapi/consumers/sendtodevice.go
+++ b/syncapi/consumers/sendtodevice.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -43,7 +44,7 @@ type OutputSendToDeviceEventConsumer struct {
db storage.Database
keyAPI keyapi.SyncKeyAPI
serverName gomatrixserverlib.ServerName // our server name
- stream types.StreamProvider
+ stream streams.StreamProvider
notifier *notifier.Notifier
}
@@ -56,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
store storage.Database,
keyAPI keyapi.SyncKeyAPI,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
) *OutputSendToDeviceEventConsumer {
return &OutputSendToDeviceEventConsumer{
ctx: process.Context(),
diff --git a/syncapi/consumers/typing.go b/syncapi/consumers/typing.go
index 88db80f8..67a26239 100644
--- a/syncapi/consumers/typing.go
+++ b/syncapi/consumers/typing.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@@ -36,7 +37,7 @@ type OutputTypingEventConsumer struct {
durable string
topic string
eduCache *caching.EDUCache
- stream types.StreamProvider
+ stream streams.StreamProvider
notifier *notifier.Notifier
}
@@ -48,7 +49,7 @@ func NewOutputTypingEventConsumer(
js nats.JetStreamContext,
eduCache *caching.EDUCache,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
) *OutputTypingEventConsumer {
return &OutputTypingEventConsumer{
ctx: process.Context(),
diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go
index c9b96f78..3c73dc1f 100644
--- a/syncapi/consumers/userapi.go
+++ b/syncapi/consumers/userapi.go
@@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -40,7 +41,7 @@ type OutputNotificationDataConsumer struct {
topic string
db storage.Database
notifier *notifier.Notifier
- stream types.StreamProvider
+ stream streams.StreamProvider
}
// NewOutputNotificationDataConsumer creates a new consumer. Call
@@ -51,7 +52,7 @@ func NewOutputNotificationDataConsumer(
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
- stream types.StreamProvider,
+ stream streams.StreamProvider,
) *OutputNotificationDataConsumer {
s := &OutputNotificationDataConsumer{
ctx: process.Context(),