aboutsummaryrefslogtreecommitdiff
path: root/syncapi/producers/userapi_streamevent.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/producers/userapi_streamevent.go')
-rw-r--r--syncapi/producers/userapi_streamevent.go60
1 files changed, 60 insertions, 0 deletions
diff --git a/syncapi/producers/userapi_streamevent.go b/syncapi/producers/userapi_streamevent.go
new file mode 100644
index 00000000..2bbd19c0
--- /dev/null
+++ b/syncapi/producers/userapi_streamevent.go
@@ -0,0 +1,60 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// UserAPIProducer produces events for the user API server to consume
+type UserAPIStreamEventProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+// SendData sends account data to the user API server
+func (p *UserAPIStreamEventProducer) SendStreamEvent(roomID string, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) error {
+ m := &nats.Msg{
+ Subject: p.Topic,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.RoomID, roomID)
+
+ data := types.StreamedEvent{
+ Event: event,
+ StreamPosition: pos,
+ }
+ var err error
+ m.Data, err = json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "room_id": roomID,
+ "event_id": event.EventID(),
+ "event_type": event.Type(),
+ "stream_pos": pos,
+ }).Tracef("Producing to topic '%s'", p.Topic)
+
+ _, err = p.JetStream.PublishMsg(m)
+ return err
+}