aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/clientapi.go
diff options
context:
space:
mode:
authorAlex Chen <Cnly@users.noreply.github.com>2019-07-12 22:59:53 +0800
committerGitHub <noreply@github.com>2019-07-12 22:59:53 +0800
commit29841bed6b1a88787211368e6052a87a658c5714 (patch)
tree85f5dcbcfdeea7ee7c6db74379df31991cd6c9c8 /syncapi/consumers/clientapi.go
parentf8463063ac4f45e61dfd0b27647bf00a8d05daa1 (diff)
Add typing notifications to /sync responses - fixes #635 (#718)
This PR adds a new consumer for typing notifications in syncapi. It also brings changes to syncserver.go and some related files so EDUs can better fit in /sync responses. Fixes #635. Fixes #574.
Diffstat (limited to 'syncapi/consumers/clientapi.go')
-rw-r--r--syncapi/consumers/clientapi.go9
1 files changed, 5 insertions, 4 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index d05a7692..f0db5642 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
+ "github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
@@ -29,7 +30,7 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
- db *storage.SyncServerDatabase
+ db *storage.SyncServerDatasource
notifier *sync.Notifier
}
@@ -38,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
- store *storage.SyncServerDatabase,
+ store *storage.SyncServerDatasource,
) *OutputClientDataConsumer {
consumer := common.ContinualConsumer{
@@ -78,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
- syncStreamPos, err := s.db.UpsertAccountData(
+ pduPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@@ -89,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
+ s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})
return nil
}