aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/receipts.go (renamed from syncapi/consumers/eduserver_receipts.go)24
-rw-r--r--syncapi/consumers/sendtodevice.go (renamed from syncapi/consumers/eduserver_sendtodevice.go)26
-rw-r--r--syncapi/consumers/typing.go (renamed from syncapi/consumers/eduserver_typing.go)51
4 files changed, 57 insertions, 46 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 40c1cd3d..c28da460 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -62,7 +62,7 @@ func NewOutputClientDataConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
db: store,
notifier: notifier,
stream: stream,
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/receipts.go
index ab79998e..6bb0747f 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -17,11 +17,10 @@ package consumers
import (
"context"
"database/sql"
- "encoding/json"
"fmt"
+ "strconv"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -63,7 +62,7 @@ func NewOutputReceiptEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
@@ -72,7 +71,7 @@ func NewOutputReceiptEventConsumer(
}
}
-// Start consuming from EDU api
+// Start consuming receipts events.
func (s *OutputReceiptEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -81,14 +80,23 @@ func (s *OutputReceiptEventConsumer) Start() error {
}
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputReceiptEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
+ output := types.OutputReceiptEvent{
+ UserID: msg.Header.Get(jetstream.UserID),
+ RoomID: msg.Header.Get(jetstream.RoomID),
+ EventID: msg.Header.Get(jetstream.EventID),
+ Type: msg.Header.Get("type"),
+ }
+
+ timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
+ if err != nil {
// If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
+ log.WithError(err).Errorf("output log: message parse failure")
sentry.CaptureException(err)
return true
}
+ output.Timestamp = gomatrixserverlib.Timestamp(timestamp)
+
streamPos, err := s.db.StoreReceipt(
s.ctx,
output.RoomID,
@@ -117,7 +125,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true
}
-func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error {
+func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
if output.Type != "m.read" {
return nil
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/sendtodevice.go
index bdbe7735..0b9153fc 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/sendtodevice.go
@@ -19,7 +19,6 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -58,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
@@ -66,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer(
}
}
-// Start consuming from EDU api
+// Start consuming send-to-device events.
func (s *OutputSendToDeviceEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -75,15 +74,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error {
}
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputSendToDeviceEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
- sentry.CaptureException(err)
- return true
- }
-
- _, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
+ userID := msg.Header.Get(jetstream.UserID)
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
sentry.CaptureException(err)
return true
@@ -92,12 +84,20 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na
return true
}
+ var output types.OutputSendToDeviceEvent
+ if err = json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("output log: message parse failure")
+ sentry.CaptureException(err)
+ return true
+ }
+
util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender,
"user_id": output.UserID,
"device_id": output.DeviceID,
"event_type": output.Type,
- }).Info("sync API received send-to-device event from EDU server")
+ }).Debugf("sync API received send-to-device event from the clientapi/federationsender")
streamPos, err := s.db.StoreNewSendForDeviceMessage(
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/typing.go
index c2828c7f..48e484ec 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/typing.go
@@ -16,16 +16,14 @@ package consumers
import (
"context"
- "encoding/json"
+ "strconv"
+ "time"
- "github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@@ -37,7 +35,7 @@ type OutputTypingEventConsumer struct {
jetstream nats.JetStreamContext
durable string
topic string
- eduCache *cache.EDUCache
+ eduCache *caching.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
}
@@ -48,8 +46,7 @@ func NewOutputTypingEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
- store storage.Database,
- eduCache *cache.EDUCache,
+ eduCache *caching.EDUCache,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
@@ -57,14 +54,14 @@ func NewOutputTypingEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"),
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
}
-// Start consuming from EDU api
+// Start consuming typing events.
func (s *OutputTypingEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -73,34 +70,40 @@ func (s *OutputTypingEventConsumer) Start() error {
}
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputTypingEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
- sentry.CaptureException(err)
+ roomID := msg.Header.Get(jetstream.RoomID)
+ userID := msg.Header.Get(jetstream.UserID)
+ typing, err := strconv.ParseBool(msg.Header.Get("typing"))
+ if err != nil {
+ log.WithError(err).Errorf("output log: typing parse failure")
+ return true
+ }
+ timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms"))
+ if err != nil {
+ log.WithError(err).Errorf("output log: timeout_ms parse failure")
return true
}
log.WithFields(log.Fields{
- "room_id": output.Event.RoomID,
- "user_id": output.Event.UserID,
- "typing": output.Event.Typing,
- }).Debug("received data from EDU server")
+ "room_id": roomID,
+ "user_id": userID,
+ "typing": typing,
+ "timeout": timeout,
+ }).Debug("syncapi received EDU data from client api")
var typingPos types.StreamPosition
- typingEvent := output.Event
- if typingEvent.Typing {
+ if typing {
+ expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond)
typingPos = types.StreamPosition(
- s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
+ s.eduCache.AddTypingUser(userID, roomID, &expiry),
)
} else {
typingPos = types.StreamPosition(
- s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
+ s.eduCache.RemoveUser(userID, roomID),
)
}
s.stream.Advance(typingPos)
- s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
+ s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos})
return true
}