aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-02-17 15:58:54 +0000
committerGitHub <noreply@github.com>2022-02-17 15:58:54 +0000
commit0b123b29f5304603d32d790512c091ac942fb37d (patch)
treed452fd12e3e2c4f347877effd11efa4041e1eb08 /roomserver
parent140077265e2842bf8e2d2c6399343490740cd8a6 (diff)
Use process context for roomserver input (#2198)
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/api.go10
-rw-r--r--roomserver/internal/input/input.go4
-rw-r--r--roomserver/roomserver.go2
3 files changed, 11 insertions, 5 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index fd963ad8..e58f11c1 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -14,6 +14,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
@@ -32,6 +33,7 @@ type RoomserverInternalAPI struct {
*perform.Publisher
*perform.Backfiller
*perform.Forgetter
+ ProcessContext *process.ProcessContext
DB storage.Database
Cfg *config.RoomServer
Cache caching.RoomServerCaches
@@ -48,12 +50,13 @@ type RoomserverInternalAPI struct {
}
func NewRoomserverAPI(
- cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext,
- inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches,
- perspectiveServerNames []gomatrixserverlib.ServerName,
+ processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
+ consumer nats.JetStreamContext, inputRoomEventTopic, outputRoomEventTopic string,
+ caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB)
a := &RoomserverInternalAPI{
+ ProcessContext: processCtx,
DB: roomserverDB,
Cfg: cfg,
Cache: caches,
@@ -83,6 +86,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing
r.Inputer = &input.Inputer{
+ ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 5bdec0a2..22e4b67a 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
@@ -59,6 +60,7 @@ var keyContentFields = map[string]string{
}
type Inputer struct {
+ ProcessContext *process.ProcessContext
DB storage.Database
JetStream nats.JetStreamContext
Durable nats.SubOpt
@@ -115,7 +117,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent)
+ action, err := r.processRoomEventUsingUpdater(r.ProcessContext.Context(), roomID, &inputRoomEvent)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index e1b84b80..950c6b4e 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -53,7 +53,7 @@ func NewInternalAPI(
js := jetstream.Prepare(&cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
- cfg, roomserverDB, js,
+ base.ProcessContext, cfg, roomserverDB, js,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,