aboutsummaryrefslogtreecommitdiff
path: root/syncapi/syncapi.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/syncapi.go
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/syncapi.go')
-rw-r--r--syncapi/syncapi.go32
1 files changed, 16 insertions, 16 deletions
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 0610add5..4a09940d 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
+ "github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@@ -28,8 +29,10 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/consumers"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/sync"
)
@@ -50,57 +53,54 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to connect to sync db")
}
- pos, err := syncDB.SyncPosition(context.Background())
- if err != nil {
- logrus.WithError(err).Panicf("failed to get sync position")
- }
-
- notifier := sync.NewNotifier(pos)
- err = notifier.Load(context.Background(), syncDB)
- if err != nil {
- logrus.WithError(err).Panicf("failed to start notifier")
+ eduCache := cache.New()
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
+ notifier := notifier.NewNotifier(streams.Latest(context.Background()))
+ if err = notifier.Load(context.Background(), syncDB); err != nil {
+ logrus.WithError(err).Panicf("failed to load notifier ")
}
- requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI)
+ requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
- consumer, notifier, keyAPI, rsAPI, syncDB,
+ consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, notifier, syncDB, rsAPI,
+ cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
+ streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
clientConsumer := consumers.NewOutputClientDataConsumer(
- cfg, consumer, notifier, syncDB,
+ cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
- cfg, consumer, notifier, syncDB,
+ cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
- cfg, consumer, notifier, syncDB,
+ cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
- cfg, consumer, notifier, syncDB,
+ cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")