aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/eduserver_receipts.go2
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go6
-rw-r--r--syncapi/consumers/eduserver_typing.go9
-rw-r--r--syncapi/consumers/roomserver.go12
4 files changed, 14 insertions, 15 deletions
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 5c286cf0..88334b65 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
// update stream position
- s.notifier.OnNewReceipt(types.StreamingToken{ReceiptPosition: streamPos})
+ s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 0c3f52cd..a375baf8 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -94,10 +94,8 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
"event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server")
- streamPos := s.db.AddSendToDevice()
-
- _, err = s.db.StoreNewSendForDeviceMessage(
- context.TODO(), streamPos, output.UserID, output.DeviceID, output.SendToDeviceEvent,
+ streamPos, err := s.db.StoreNewSendForDeviceMessage(
+ context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent,
)
if err != nil {
log.WithError(err).Errorf("failed to store send-to-device message")
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 885e7fd1..28574b50 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -64,12 +64,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
- s.notifier.OnNewEvent(
- nil, roomID, nil,
- types.StreamingToken{
- TypingPosition: types.StreamPosition(latestSyncPosition),
- },
- )
+ s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
})
return s.typingConsumer.Start()
@@ -97,6 +92,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
- s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.StreamingToken{TypingPosition: typingPos})
+ s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index be84a281..3b1f1582 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -259,6 +259,12 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
+ if msg.Event.StateKey() == nil {
+ log.WithFields(log.Fields{
+ "event": string(msg.Event.JSON()),
+ }).Panicf("roomserver output log: invite has no state key")
+ return nil
+ }
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
@@ -269,14 +275,14 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
- s.notifier.OnNewEvent(msg.Event, "", nil, types.StreamingToken{PDUPosition: pduPos})
+ s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
return nil
}
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) error {
- sp, err := s.db.RetireInviteEvent(ctx, msg.EventID)
+ pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@@ -287,7 +293,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
- s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.StreamingToken{PDUPosition: sp})
+ s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
return nil
}