aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-07-01 10:54:07 +0100
committerGitHub <noreply@github.com>2022-07-01 10:54:07 +0100
commitb50a24c666c4c45e1410dfc35d5ab2dc7e530a0f (patch)
tree04a71d4eaca38a78eda091ed5449bf2f492f73c1
parent89cd0e8fc13b040470aebe2eb4d36a9235b1473d (diff)
Roomserver producers package (#2546)
* Give the roomserver a producers package * Change init point * Populate ACLs API * Fix build issues * `RoomEventProducer` naming
-rw-r--r--roomserver/internal/alias.go3
-rw-r--r--roomserver/internal/api.go46
-rw-r--r--roomserver/internal/input/input.go87
-rw-r--r--roomserver/internal/input/input_events.go4
-rw-r--r--roomserver/internal/input/input_latest_events.go2
-rw-r--r--roomserver/internal/perform/perform_admin.go2
-rw-r--r--roomserver/internal/perform/perform_inbound_peek.go2
-rw-r--r--roomserver/internal/perform/perform_peek.go2
-rw-r--r--roomserver/internal/perform/perform_unpeek.go2
-rw-r--r--roomserver/producers/roomevent.go89
-rw-r--r--roomserver/roomserver.go1
11 files changed, 137 insertions, 103 deletions
diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go
index f47ae47f..175bb931 100644
--- a/roomserver/internal/alias.go
+++ b/roomserver/internal/alias.go
@@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err
}
- err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
+ err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
if err != nil {
return err
}
-
}
}
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index acdaeef6..d59b8be7 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -12,8 +12,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/perform"
"github.com/matrix-org/dendrite/roomserver/internal/query"
+ "github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -49,17 +51,21 @@ type RoomserverInternalAPI struct {
JetStream nats.JetStreamContext
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
- OutputRoomEventTopic string // JetStream topic for new output room events
+ OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []gomatrixserverlib.ServerName
}
func NewRoomserverAPI(
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
- consumer nats.JetStreamContext, nc *nats.Conn,
- inputRoomEventTopic, outputRoomEventTopic string,
+ js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string,
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB)
+ producer := &producers.RoomEventProducer{
+ Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)),
+ JetStream: js,
+ ACLs: serverACLs,
+ }
a := &RoomserverInternalAPI{
ProcessContext: processCtx,
DB: roomserverDB,
@@ -68,8 +74,8 @@ func NewRoomserverAPI(
ServerName: cfg.Matrix.ServerName,
PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: inputRoomEventTopic,
- OutputRoomEventTopic: outputRoomEventTopic,
- JetStream: consumer,
+ OutputProducer: producer,
+ JetStream: js,
NATSClient: nc,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
@@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
r.KeyRing = keyRing
r.Inputer = &input.Inputer{
- Cfg: r.Cfg,
- ProcessContext: r.ProcessContext,
- DB: r.DB,
- InputRoomEventTopic: r.InputRoomEventTopic,
- OutputRoomEventTopic: r.OutputRoomEventTopic,
- JetStream: r.JetStream,
- NATSClient: r.NATSClient,
- Durable: nats.Durable(r.Durable),
- ServerName: r.Cfg.Matrix.ServerName,
- FSAPI: fsAPI,
- KeyRing: keyRing,
- ACLs: r.ServerACLs,
- Queryer: r.Queryer,
+ Cfg: r.Cfg,
+ ProcessContext: r.ProcessContext,
+ DB: r.DB,
+ InputRoomEventTopic: r.InputRoomEventTopic,
+ OutputProducer: r.OutputProducer,
+ JetStream: r.JetStream,
+ NATSClient: r.NATSClient,
+ Durable: nats.Durable(r.Durable),
+ ServerName: r.Cfg.Matrix.ServerName,
+ FSAPI: fsAPI,
+ KeyRing: keyRing,
+ ACLs: r.ServerACLs,
+ Queryer: r.Queryer,
}
r.Inviter = &perform.Inviter{
DB: r.DB,
@@ -199,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
if len(outputEvents) == 0 {
return nil
}
- return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
+ return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
}
func (r *RoomserverInternalAPI) PerformLeave(
@@ -215,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
if len(outputEvents) == 0 {
return nil
}
- return r.WriteOutputEvents(req.RoomID, outputEvents)
+ return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
}
func (r *RoomserverInternalAPI) PerformForget(
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 600994c5..fa07c1d2 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
+ "github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -37,16 +38,8 @@ import (
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
- log "github.com/sirupsen/logrus"
- "github.com/tidwall/gjson"
)
-var keyContentFields = map[string]string{
- "m.room.join_rules": "join_rule",
- "m.room.history_visibility": "history_visibility",
- "m.room.member": "membership",
-}
-
// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
@@ -75,19 +68,19 @@ var keyContentFields = map[string]string{
// up, so they will do nothing until a new event comes in for B
// or C.
type Inputer struct {
- Cfg *config.RoomServer
- ProcessContext *process.ProcessContext
- DB storage.Database
- NATSClient *nats.Conn
- JetStream nats.JetStreamContext
- Durable nats.SubOpt
- ServerName gomatrixserverlib.ServerName
- FSAPI fedapi.RoomserverFederationAPI
- KeyRing gomatrixserverlib.JSONVerifier
- ACLs *acls.ServerACLs
- InputRoomEventTopic string
- OutputRoomEventTopic string
- workers sync.Map // room ID -> *worker
+ Cfg *config.RoomServer
+ ProcessContext *process.ProcessContext
+ DB storage.Database
+ NATSClient *nats.Conn
+ JetStream nats.JetStreamContext
+ Durable nats.SubOpt
+ ServerName gomatrixserverlib.ServerName
+ FSAPI fedapi.RoomserverFederationAPI
+ KeyRing gomatrixserverlib.JSONVerifier
+ ACLs *acls.ServerACLs
+ InputRoomEventTopic string
+ OutputProducer *producers.RoomEventProducer
+ workers sync.Map // room ID -> *worker
Queryer *query.Queryer
}
@@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents(
}
}
-// WriteOutputEvents implements OutputRoomEventWriter
-func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
- var err error
- for _, update := range updates {
- msg := &nats.Msg{
- Subject: r.OutputRoomEventTopic,
- Header: nats.Header{},
- }
- msg.Header.Set(jetstream.RoomID, roomID)
- msg.Data, err = json.Marshal(update)
- if err != nil {
- return err
- }
- logger := log.WithFields(log.Fields{
- "room_id": roomID,
- "type": update.Type,
- })
- if update.NewRoomEvent != nil {
- eventType := update.NewRoomEvent.Event.Type()
- logger = logger.WithFields(log.Fields{
- "event_type": eventType,
- "event_id": update.NewRoomEvent.Event.EventID(),
- "adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
- "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
- "send_as_server": update.NewRoomEvent.SendAsServer,
- "sender": update.NewRoomEvent.Event.Sender(),
- })
- if update.NewRoomEvent.Event.StateKey() != nil {
- logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
- }
- contentKey := keyContentFields[eventType]
- if contentKey != "" {
- value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
- if value.Exists() {
- logger = logger.WithField("content_value", value.String())
- }
- }
-
- if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
- ev := update.NewRoomEvent.Event.Unwrap()
- defer r.ACLs.OnServerACLUpdate(ev)
- }
- }
- logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
- if _, err := r.JetStream.PublishMsg(msg); err != nil {
- logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
- return err
- }
- }
- return nil
-}
-
var roomserverInputBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index ff05f798..743b1efe 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld:
- err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
+ err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{
@@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
// so notify downstream components to redact this event - they should have it if they've
// been tracking our output log.
if redactedEventID != "" {
- err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
+ err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeRedactedEvent,
RedactedEvent: &api.OutputRedactedEvent{
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index e76f4ba8..f7d15fdb 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
- if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
+ if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}
diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go
index d3fb7109..1cb52966 100644
--- a/roomserver/internal/perform/perform_admin.go
+++ b/roomserver/internal/perform/perform_admin.go
@@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser(
if len(outputEvents) == 0 {
continue
}
- if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil {
+ if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go
index d19fc838..32c81e84 100644
--- a/roomserver/internal/perform/perform_inbound_peek.go
+++ b/roomserver/internal/perform/perform_inbound_peek.go
@@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
}
- err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
+ err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
{
Type: api.OutputTypeNewInboundPeek,
NewInboundPeek: &api.OutputNewInboundPeek{
diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go
index 45e63888..5560916b 100644
--- a/roomserver/internal/perform/perform_peek.go
+++ b/roomserver/internal/perform/perform_peek.go
@@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(
// TODO: handle federated peeks
- err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
+ err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
{
Type: api.OutputTypeNewPeek,
NewPeek: &api.OutputNewPeek{
diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go
index 1057499c..1fe8d5a0 100644
--- a/roomserver/internal/perform/perform_unpeek.go
+++ b/roomserver/internal/perform/perform_unpeek.go
@@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(
// TODO: handle federated peeks
- err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
+ err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
{
Type: api.OutputTypeRetirePeek,
RetirePeek: &api.OutputRetirePeek{
diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go
new file mode 100644
index 00000000..987e6c94
--- /dev/null
+++ b/roomserver/producers/roomevent.go
@@ -0,0 +1,89 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/roomserver/acls"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+ "github.com/tidwall/gjson"
+)
+
+var keyContentFields = map[string]string{
+ "m.room.join_rules": "join_rule",
+ "m.room.history_visibility": "history_visibility",
+ "m.room.member": "membership",
+}
+
+type RoomEventProducer struct {
+ Topic string
+ ACLs *acls.ServerACLs
+ JetStream nats.JetStreamContext
+}
+
+func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
+ var err error
+ for _, update := range updates {
+ msg := &nats.Msg{
+ Subject: r.Topic,
+ Header: nats.Header{},
+ }
+ msg.Header.Set(jetstream.RoomID, roomID)
+ msg.Data, err = json.Marshal(update)
+ if err != nil {
+ return err
+ }
+ logger := log.WithFields(log.Fields{
+ "room_id": roomID,
+ "type": update.Type,
+ })
+ if update.NewRoomEvent != nil {
+ eventType := update.NewRoomEvent.Event.Type()
+ logger = logger.WithFields(log.Fields{
+ "event_type": eventType,
+ "event_id": update.NewRoomEvent.Event.EventID(),
+ "adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
+ "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
+ "send_as_server": update.NewRoomEvent.SendAsServer,
+ "sender": update.NewRoomEvent.Event.Sender(),
+ })
+ if update.NewRoomEvent.Event.StateKey() != nil {
+ logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
+ }
+ contentKey := keyContentFields[eventType]
+ if contentKey != "" {
+ value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
+ if value.Exists() {
+ logger = logger.WithField("content_value", value.String())
+ }
+ }
+
+ if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
+ ev := update.NewRoomEvent.Event.Unwrap()
+ defer r.ACLs.OnServerACLUpdate(ev)
+ }
+ }
+ logger.Tracef("Producing to topic '%s'", r.Topic)
+ if _, err := r.JetStream.PublishMsg(msg); err != nil {
+ logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
+ return err
+ }
+ }
+ return nil
+}
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 1480e894..eb68100f 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -55,7 +55,6 @@ func NewInternalAPI(
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
- cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}