aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/consumers/roomserver.go5
-rw-r--r--federationapi/consumers/roomserver.go7
-rw-r--r--roomserver/producers/roomevent.go13
-rw-r--r--setup/jetstream/streams.go7
-rw-r--r--test/testrig/jetstream.go9
-rw-r--r--userapi/consumers/roomserver.go7
6 files changed, 30 insertions, 18 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index d44f32b3..ac68f4bd 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -101,6 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
for _, msg := range msgs {
+ // Only handle events we care about
+ receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
+ if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent {
+ continue
+ }
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 349b50b0..a4273362 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -79,6 +79,13 @@ func (s *OutputRoomEventConsumer) Start() error {
// realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
+ receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
+
+ // Only handle events we care about
+ if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek {
+ return true
+ }
+
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go
index 987e6c94..9c452198 100644
--- a/roomserver/producers/roomevent.go
+++ b/roomserver/producers/roomevent.go
@@ -17,12 +17,13 @@ package producers
import (
"encoding/json"
- "github.com/matrix-org/dendrite/roomserver/acls"
- "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
+
+ "github.com/matrix-org/dendrite/roomserver/acls"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/jetstream"
)
var keyContentFields = map[string]string{
@@ -40,10 +41,8 @@ type RoomEventProducer struct {
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
var err error
for _, update := range updates {
- msg := &nats.Msg{
- Subject: r.Topic,
- Header: nats.Header{},
- }
+ msg := nats.NewMsg(r.Topic)
+ msg.Header.Set(jetstream.RoomEventType, string(update.Type))
msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update)
if err != nil {
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index ee9810da..590f0cbd 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -9,9 +9,10 @@ import (
)
const (
- UserID = "user_id"
- RoomID = "room_id"
- EventID = "event_id"
+ UserID = "user_id"
+ RoomID = "room_id"
+ EventID = "event_id"
+ RoomEventType = "output_room_event_type"
)
var (
diff --git a/test/testrig/jetstream.go b/test/testrig/jetstream.go
index 74cf9506..b880eea4 100644
--- a/test/testrig/jetstream.go
+++ b/test/testrig/jetstream.go
@@ -4,10 +4,11 @@ import (
"encoding/json"
"testing"
+ "github.com/nats-io/nats.go"
+
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/nats-io/nats.go"
)
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
@@ -21,10 +22,8 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
t.Helper()
- msg := &nats.Msg{
- Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
- Header: nats.Header{},
- }
+ msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
+ msg.Header.Set(jetstream.RoomEventType, string(update.Type))
msg.Header.Set(jetstream.RoomID, roomID)
var err error
msg.Data, err = json.Marshal(update)
diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go
index 952de98f..a1287694 100644
--- a/userapi/consumers/roomserver.go
+++ b/userapi/consumers/roomserver.go
@@ -72,15 +72,16 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
+ // Only handle events we care about
+ if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent {
+ return true
+ }
var output rsapi.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
- if output.Type != rsapi.OutputTypeNewRoomEvent {
- return true
- }
event := output.NewRoomEvent.Event
if event == nil {
log.Errorf("userapi consumer: expected event")