aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/consumers/roomserver.go4
-rw-r--r--cmd/dendrite-monolith-server/main.go4
-rw-r--r--federationapi/consumers/eduserver.go8
-rw-r--r--federationapi/consumers/roomserver.go4
-rw-r--r--federationapi/federationapi_keys_test.go9
-rw-r--r--roomserver/internal/api.go1
-rw-r--r--roomserver/internal/input/input.go3
-rw-r--r--setup/config/config_jetstream.go6
-rw-r--r--setup/jetstream/nats.go7
-rw-r--r--syncapi/consumers/clientapi.go4
-rw-r--r--syncapi/consumers/eduserver_receipts.go4
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go4
-rw-r--r--syncapi/consumers/eduserver_typing.go4
-rw-r--r--syncapi/consumers/roomserver.go4
14 files changed, 51 insertions, 15 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 139b5724..8aea5c34 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -34,6 +34,7 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
asDB storage.Database
rsAPI api.RoomserverInternalAPI
@@ -54,6 +55,7 @@ func NewOutputRoomEventConsumer(
return &OutputRoomEventConsumer{
ctx: process.Context(),
jetstream: js,
+ durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
@@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 08851734..4d0598f3 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -99,10 +99,6 @@ func main() {
}
keyRing := fsAPI.KeyRing()
- // The underlying roomserver implementation needs to be able to call the fedsender.
- // This is different to rsAPI which can be the http client which doesn't need this dependency
- rsImpl.SetFederationAPI(fsAPI, keyRing)
-
keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
keyAPI := keyImpl
if base.UseHTTPAPIs {
diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go
index 9e52acef..c3e5b4d4 100644
--- a/federationapi/consumers/eduserver.go
+++ b/federationapi/consumers/eduserver.go
@@ -34,6 +34,7 @@ import (
type OutputEDUConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
@@ -56,6 +57,7 @@ func NewOutputEDUConsumer(
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
@@ -64,13 +66,13 @@ func NewOutputEDUConsumer(
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
- if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil {
+ if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil {
return err
}
- if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil {
+ if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil {
return err
}
- if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil {
+ if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil {
return err
}
return nil
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 12410bb7..632adae3 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -37,6 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
+ durable nats.SubOpt
db storage.Database
queues *queue.OutgoingQueues
topic string
@@ -58,13 +59,14 @@ func NewOutputRoomEventConsumer(
db: store,
queues: queues,
rsAPI: rsAPI,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go
index b9503963..4774c882 100644
--- a/federationapi/federationapi_keys_test.go
+++ b/federationapi/federationapi_keys_test.go
@@ -68,6 +68,13 @@ func TestMain(m *testing.M) {
panic("can't create cache: " + err.Error())
}
+ // Create a temporary directory for JetStream.
+ d, err := ioutil.TempDir("./", "jetstream*")
+ if err != nil {
+ panic(err)
+ }
+ defer os.RemoveAll(d)
+
// Draw up just enough Dendrite config for the server key
// API to work.
cfg := &config.Dendrite{}
@@ -75,6 +82,8 @@ func TestMain(m *testing.M) {
cfg.Global.ServerName = gomatrixserverlib.ServerName(s.name)
cfg.Global.PrivateKey = testPriv
cfg.Global.JetStream.InMemory = true
+ cfg.Global.JetStream.TopicPrefix = string(s.name[:1])
+ cfg.Global.JetStream.StoragePath = config.Path(d)
cfg.Global.KeyID = serverKeyID
cfg.Global.KeyValidityPeriod = s.validity
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:")
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index e370f7e4..cf2e59c6 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -67,6 +67,7 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
+ Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs,
},
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index dbff5fdd..57e51055 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -43,6 +43,7 @@ var keyContentFields = map[string]string{
type Inputer struct {
DB storage.Database
JetStream nats.JetStreamContext
+ Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
InputRoomEventTopic string
@@ -85,6 +86,8 @@ func (r *Inputer) Start() error {
// or nak them within a certain amount of time. This stops that from
// happening, so we don't end up doing a lot of unnecessary duplicate work.
nats.MaxDeliver(0),
+ // Use a durable named consumer.
+ r.Durable,
)
return err
}
diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go
index 0bd84899..94e2d88b 100644
--- a/setup/config/config_jetstream.go
+++ b/setup/config/config_jetstream.go
@@ -2,6 +2,8 @@ package config
import (
"fmt"
+
+ "github.com/nats-io/nats.go"
)
type JetStream struct {
@@ -23,6 +25,10 @@ func (c *JetStream) TopicFor(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
+func (c *JetStream) Durable(name string) nats.SubOpt {
+ return nats.Durable(c.TopicFor(name))
+}
+
func (c *JetStream) Defaults(generate bool) {
c.Addresses = []string{}
c.TopicPrefix = "Dendrite"
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index eec68d82..6dbbd1f4 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -75,13 +75,18 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
if info == nil {
stream.Subjects = []string{name}
+
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
stream.Storage = nats.MemoryStorage
}
- if _, err = s.AddStream(stream); err != nil {
+ // Namespace the streams without modifying the original streams
+ // array, otherwise we end up with namespaces on namespaces.
+ namespaced := *stream
+ namespaced.Name = name
+ if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
}
}
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 85710cdd..1ec9beb0 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -34,6 +34,7 @@ import (
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@@ -53,6 +54,7 @@ func NewOutputClientDataConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store,
notifier: notifier,
stream: stream,
@@ -61,7 +63,7 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 582e1d64..57d69d6f 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -34,6 +34,7 @@ import (
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@@ -54,6 +55,7 @@ func NewOutputReceiptEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
@@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 6579c303..54e689fa 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -36,6 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
@@ -57,6 +58,7 @@ func NewOutputSendToDeviceEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
@@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 487befe8..de2f6f95 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -35,6 +35,7 @@ import (
type OutputTypingEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
@@ -56,6 +57,7 @@ func NewOutputTypingEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache,
notifier: notifier,
stream: stream,
@@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 5b008e3d..6b3ebe53 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
db storage.Database
pduStream types.StreamProvider
@@ -61,6 +62,7 @@ func NewOutputRoomEventConsumer(
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store,
notifier: notifier,
pduStream: pduStream,
@@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}