aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/appservice.go2
-rw-r--r--clientapi/clientapi.go2
-rw-r--r--eduserver/eduserver.go2
-rw-r--r--federationapi/consumers/keychange.go78
-rw-r--r--federationapi/consumers/roomserver.go5
-rw-r--r--federationapi/federationapi.go4
-rw-r--r--federationapi/storage/interface.go2
-rw-r--r--go.mod4
-rw-r--r--go.sum54
-rw-r--r--internal/consumers.go139
-rw-r--r--keyserver/api/api.go2
-rw-r--r--keyserver/consumers/cross_signing.go62
-rw-r--r--keyserver/keyserver.go4
-rw-r--r--keyserver/storage/interface.go5
-rw-r--r--keyserver/storage/postgres/key_changes_table.go5
-rw-r--r--keyserver/storage/sqlite3/key_changes_table.go5
-rw-r--r--keyserver/storage/storage_test.go6
-rw-r--r--keyserver/storage/tables/interface.go2
-rw-r--r--keyserver/types/storage.go13
-rw-r--r--roomserver/roomserver.go2
-rw-r--r--setup/jetstream/nats.go19
-rw-r--r--syncapi/consumers/keychange.go83
-rw-r--r--syncapi/internal/keychange.go6
-rw-r--r--syncapi/storage/interface.go3
-rw-r--r--syncapi/syncapi.go6
25 files changed, 155 insertions, 360 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 924a609e..7e7c67f5 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -58,7 +58,7 @@ func NewInternalAPI(
},
},
}
- js, _, _ := jetstream.Prepare(&base.Cfg.Global.JetStream)
+ js := jetstream.Prepare(&base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 7c772125..d678ada9 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -49,7 +49,7 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
- js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index db03001b..febcf286 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -42,7 +42,7 @@ func NewInternalAPI(
) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
- js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
return &input.EDUServerInputAPI{
Cache: eduCache,
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 6a737d0a..1ec9f4c1 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -17,80 +17,73 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
- "github.com/Shopify/sarama"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
ctx context.Context
- consumer *internal.ContinualConsumer
+ jetstream nats.JetStreamContext
+ durable string
db storage.Database
queues *queue.OutgoingQueues
serverName gomatrixserverlib.ServerName
rsAPI roomserverAPI.RoomserverInternalAPI
+ topic string
}
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI roomserverAPI.RoomserverInternalAPI,
) *KeyChangeConsumer {
- c := &KeyChangeConsumer{
- ctx: process.Context(),
- consumer: &internal.ContinualConsumer{
- Process: process,
- ComponentName: "federationapi/keychange",
- Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- },
+ return &KeyChangeConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"),
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,
rsAPI: rsAPI,
}
- c.consumer.ProcessMessage = c.onMessage
-
- return c
}
// Start consuming from key servers
func (t *KeyChangeConsumer) Start() error {
- if err := t.consumer.Start(); err != nil {
- return fmt.Errorf("t.consumer.Start: %w", err)
- }
- return nil
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
-func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
@@ -102,9 +95,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
-func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
+func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
if m.DeviceKeys == nil {
- return nil
+ return true
}
logger := logrus.WithField("user_id", m.UserID)
@@ -112,10 +105,10 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
if err != nil {
logger.WithError(err).Error("Failed to extract domain from key change event")
- return nil
+ return true
}
if originServerName != t.serverName {
- return nil
+ return true
}
var queryRes roomserverAPI.QueryRoomsForUserResponse
@@ -125,13 +118,13 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
}, &queryRes)
if err != nil {
logger.WithError(err).Error("failed to calculate joined rooms for user")
- return nil
+ return true
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil {
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
- return nil
+ return true
}
// Pack the EDU and marshal it
@@ -149,24 +142,26 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
Keys: m.KeyJSON,
}
if edu.Content, err = json.Marshal(event); err != nil {
- return err
+ logger.WithError(err).Error("failed to marshal EDU JSON")
+ return true
}
- logrus.Infof("Sending device list update message to %q", destinations)
- return t.queues.SendEDU(edu, t.serverName, destinations)
+ logger.Infof("Sending device list update message to %q", destinations)
+ err = t.queues.SendEDU(edu, t.serverName, destinations)
+ return err == nil
}
-func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
+func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
- return nil
+ return true
}
if host != gomatrixserverlib.ServerName(t.serverName) {
// Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers.
- return nil
+ return true
}
logger := logrus.WithField("user_id", output.UserID)
@@ -177,13 +172,13 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
}, &queryRes)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
- return nil
+ return true
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
- return nil
+ return true
}
// Pack the EDU and marshal it
@@ -193,11 +188,12 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
}
if edu.Content, err = json.Marshal(output); err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
- return nil
+ return true
}
logger.Infof("Sending cross-signing update message to %q", destinations)
- return t.queues.SendEDU(edu, t.serverName, destinations)
+ err = t.queues.SendEDU(edu, t.serverName, destinations)
+ return err == nil
}
func prevID(streamID int) []int {
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index ac29f930..e9862000 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -114,6 +114,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}
}
+ case api.OutputTypeNewInviteEvent:
+ log.WithField("type", output.Type).Debug(
+ "received new invite, send device keys",
+ )
+
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 63387b9d..a982d800 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -92,7 +92,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
- js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
@@ -120,7 +120,7 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
- base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationDB, rsAPI,
+ base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go
index 21a919f6..3fa8d1f7 100644
--- a/federationapi/storage/interface.go
+++ b/federationapi/storage/interface.go
@@ -19,12 +19,10 @@ import (
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/federationapi/types"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
- internal.PartitionStorer
gomatrixserverlib.KeyDatabase
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
diff --git a/go.mod b/go.mod
index bd94713a..fc18ce07 100644
--- a/go.mod
+++ b/go.mod
@@ -11,13 +11,12 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/MFAshby/stdemuxerhook v1.0.0
github.com/Masterminds/semver/v3 v3.1.1
- github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32
- github.com/Shopify/sarama v1.31.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/codeclysm/extract v2.2.0+incompatible
github.com/containerd/containerd v1.5.9 // indirect
github.com/docker/docker v20.10.12+incompatible
github.com/docker/go-connections v0.4.0
+ github.com/frankban/quicktest v1.14.0 // indirect
github.com/getsentry/sentry-go v0.12.0
github.com/gologme/log v1.3.0
github.com/gorilla/mux v1.8.0
@@ -74,6 +73,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/yaml.v2 v2.4.0
+ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
nhooyr.io/websocket v1.8.7
)
diff --git a/go.sum b/go.sum
index 038ccef5..3f8a99f4 100644
--- a/go.sum
+++ b/go.sum
@@ -100,17 +100,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA=
github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30=
-github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 h1:i3fOph9Hjleo6LbuqN9ODFxnwt7mOtYMpCGeC8qJN50=
-github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
-github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU=
-github.com/Shopify/sarama v1.31.0 h1:gObk7jCPutDxf+E6GA5G21noAZsi1SvP9ftCQYqpzus=
-github.com/Shopify/sarama v1.31.0/go.mod h1:BeW3gXRc/CxgAsrSly2RE9nIXUfC9ezb7QHBPVhvzjI=
-github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
-github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
-github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ=
-github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@@ -353,12 +344,6 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
-github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
-github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
-github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
-github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
-github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
-github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@@ -379,8 +364,6 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNp
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
-github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
-github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/frankban/quicktest v1.0.0/go.mod h1:R98jIehRai+d1/3Hv2//jOVCTJhW1VBavT6B6CuGq2k=
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
@@ -498,8 +481,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
-github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gologme/log v1.2.0/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
github.com/gologme/log v1.3.0 h1:l781G4dE+pbigClDSDzSaaYKtiueHCILUa/qSDsmHAo=
github.com/gologme/log v1.3.0/go.mod h1:yKT+DvIPdDdDoPtqFrFxheooyVmoqi0BAsw+erN3wA4=
@@ -552,8 +533,6 @@ github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
-github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
-github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -580,8 +559,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
-github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
-github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@@ -667,18 +644,6 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
-github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
-github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
-github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
-github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
-github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
-github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
-github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
-github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
-github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA=
-github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
-github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
-github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@@ -747,10 +712,7 @@ github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
-github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
-github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
@@ -1250,9 +1212,6 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
-github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
-github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
-github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -1305,8 +1264,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
-github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
-github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -1432,7 +1389,6 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
-github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
@@ -1463,11 +1419,6 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
-github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
-github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
-github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
-github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
-github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
@@ -1550,7 +1501,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
@@ -1655,7 +1605,6 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
-golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -1664,7 +1613,6 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210927181540-4e4d966f7476/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211008194852-3b03d305991f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220105145211-5b0dc2dfae98/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -1798,7 +1746,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
@@ -2034,7 +1981,6 @@ gopkg.in/yaml.v2 v2.0.0-20170712054546-1be3d31502d6/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/internal/consumers.go b/internal/consumers.go
deleted file mode 100644
index 3a4e0b7f..00000000
--- a/internal/consumers.go
+++ /dev/null
@@ -1,139 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package internal
-
-import (
- "context"
- "fmt"
-
- "github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/sirupsen/logrus"
-)
-
-// A PartitionStorer has the storage APIs needed by the consumer.
-type PartitionStorer interface {
- // PartitionOffsets returns the offsets the consumer has reached for each partition.
- PartitionOffsets(ctx context.Context, topic string) ([]sqlutil.PartitionOffset, error)
- // SetPartitionOffset records where the consumer has reached for a partition.
- SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error
-}
-
-// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
-// remember the offset it reached.
-type ContinualConsumer struct {
- // The parent context for the listener, stop consuming when this context is done
- Process *process.ProcessContext
- // The component name
- ComponentName string
- // The kafkaesque topic to consume events from.
- // This is the name used in kafka to identify the stream to consume events from.
- Topic string
- // A kafkaesque stream consumer providing the APIs for talking to the event source.
- // The interface is taken from a client library for Apache Kafka.
- // But any equivalent event streaming protocol could be made to implement the same interface.
- Consumer sarama.Consumer
- // A thing which can load and save partition offsets for a topic.
- PartitionStore PartitionStorer
- // ProcessMessage is a function which will be called for each message in the log. Return an error to
- // stop processing messages. See ErrShutdown for specific control signals.
- ProcessMessage func(msg *sarama.ConsumerMessage) error
- // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
- // It is optional.
- ShutdownCallback func()
-}
-
-// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
-var ErrShutdown = fmt.Errorf("shutdown")
-
-// Start starts the consumer consuming.
-// Starts up a goroutine for each partition in the kafka stream.
-// Returns nil once all the goroutines are started.
-// Returns an error if it can't start consuming for any of the partitions.
-func (c *ContinualConsumer) Start() error {
- _, err := c.StartOffsets()
- return err
-}
-
-// StartOffsets is the same as Start but returns the loaded offsets as well.
-func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
- offsets := map[int32]int64{}
-
- partitions, err := c.Consumer.Partitions(c.Topic)
- if err != nil {
- return nil, err
- }
- for _, partition := range partitions {
- // Default all the offsets to the beginning of the stream.
- offsets[partition] = sarama.OffsetOldest
- }
-
- storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
- if err != nil {
- return nil, err
- }
- for _, offset := range storedOffsets {
- // We've already processed events from this partition so advance the offset to where we got to.
- // ConsumePartition will start streaming from the message with the given offset (inclusive),
- // so increment 1 to avoid getting the same message a second time.
- offsets[offset.Partition] = 1 + offset.Offset
- }
-
- var partitionConsumers []sarama.PartitionConsumer
- for partition, offset := range offsets {
- pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
- if err != nil {
- for _, p := range partitionConsumers {
- p.Close() // nolint: errcheck
- }
- return nil, err
- }
- partitionConsumers = append(partitionConsumers, pc)
- }
- for _, pc := range partitionConsumers {
- go c.consumePartition(pc)
- if c.Process != nil {
- c.Process.ComponentStarted()
- go func(pc sarama.PartitionConsumer) {
- <-c.Process.WaitForShutdown()
- _ = pc.Close()
- c.Process.ComponentFinished()
- logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic)
- }(pc)
- }
- }
-
- return storedOffsets, nil
-}
-
-// consumePartition consumes the room events for a single partition of the kafkaesque stream.
-func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
- defer pc.Close() // nolint: errcheck
- for message := range pc.Messages() {
- msgErr := c.ProcessMessage(message)
- // Advance our position in the stream so that we will start at the right position after a restart.
- if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
- panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
- }
- // Shutdown if we were told to do so.
- if msgErr == ErrShutdown {
- if c.ShutdownCallback != nil {
- c.ShutdownCallback()
- }
- return
- }
- }
-}
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 0eea2f0f..3933961c 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -228,7 +228,7 @@ type QueryKeyChangesRequest struct {
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
// The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
- // Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
+ // Use types.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
ToOffset int64
}
diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go
index 4b2bd4a9..a533006f 100644
--- a/keyserver/consumers/cross_signing.go
+++ b/keyserver/consumers/cross_signing.go
@@ -18,29 +18,30 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
-
- "github.com/Shopify/sarama"
)
type OutputCrossSigningKeyUpdateConsumer struct {
- eduServerConsumer *internal.ContinualConsumer
- keyDB storage.Database
- keyAPI api.KeyInternalAPI
- serverName string
+ ctx context.Context
+ keyDB storage.Database
+ keyAPI api.KeyInternalAPI
+ serverName string
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
@@ -48,60 +49,58 @@ func NewOutputCrossSigningKeyUpdateConsumer(
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "keyserver/keyserver",
- Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- Consumer: kafkaConsumer,
- PartitionStore: keyDB,
- }
s := &OutputCrossSigningKeyUpdateConsumer{
- eduServerConsumer: &consumer,
- keyDB: keyDB,
- keyAPI: keyAPI,
- serverName: string(cfg.Global.ServerName),
+ ctx: process.Context(),
+ keyDB: keyDB,
+ jetstream: js,
+ durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
+ topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ keyAPI: keyAPI,
+ serverName: string(cfg.Global.ServerName),
}
- consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
- return s.eduServerConsumer.Start()
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
-func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
- return nil
+ return true
}
}
-func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
+func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
- return nil
+ return true
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
- return nil
+ return true
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
@@ -114,5 +113,8 @@ func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.Device
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
- return uploadRes.Error
+ if uploadRes.Error != nil {
+ return false
+ }
+ return true
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 8cc50ea0..61ccc030 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -40,7 +40,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
- js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
@@ -66,7 +66,7 @@ func NewInternalAPI(
}()
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
- base.ProcessContext, base.Cfg, consumer, db, ap,
+ base.ProcessContext, base.Cfg, js, db, ap,
)
if err := keyconsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index 87feae47..0110860e 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -18,15 +18,12 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
- internal.PartitionStorer
-
// ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination
// of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database.
ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
@@ -71,7 +68,7 @@ type Database interface {
StoreKeyChange(ctx context.Context, userID string) (int64, error)
// KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
- // A to offset of sarama.OffsetNewest means no upper limit.
+ // A to offset of types.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go
index 20d227c2..f93a94bd 100644
--- a/keyserver/storage/postgres/key_changes_table.go
+++ b/keyserver/storage/postgres/key_changes_table.go
@@ -17,9 +17,7 @@ package postgres
import (
"context"
"database/sql"
- "math"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -78,9 +76,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- if toOffset == sarama.OffsetNewest {
- toOffset = math.MaxInt64
- }
latestOffset = fromOffset
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset)
if err != nil {
diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go
index d43c15ca..e035e8c9 100644
--- a/keyserver/storage/sqlite3/key_changes_table.go
+++ b/keyserver/storage/sqlite3/key_changes_table.go
@@ -17,9 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
- "math"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -76,9 +74,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- if toOffset == sarama.OffsetNewest {
- toOffset = math.MaxInt64
- }
latestOffset = fromOffset
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset)
if err != nil {
diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go
index 2f8cf809..c4c99d8c 100644
--- a/keyserver/storage/storage_test.go
+++ b/keyserver/storage/storage_test.go
@@ -9,8 +9,8 @@ import (
"reflect"
"testing"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/dendrite/setup/config"
)
@@ -50,7 +50,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, err)
deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost")
MustNotError(t, err)
- userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest)
+ userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -74,7 +74,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
}
deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost")
MustNotError(t, err)
- userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go
index 0d94c94c..e44757e1 100644
--- a/keyserver/storage/tables/interface.go
+++ b/keyserver/storage/tables/interface.go
@@ -46,7 +46,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, userID string) (int64, error)
// SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
- // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
+ // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of types.OffsetNewest means no upper offset.
SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
Prepare() error
diff --git a/keyserver/types/storage.go b/keyserver/types/storage.go
index 3480ec65..7fb90454 100644
--- a/keyserver/types/storage.go
+++ b/keyserver/types/storage.go
@@ -14,7 +14,18 @@
package types
-import "github.com/matrix-org/gomatrixserverlib"
+import (
+ "math"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const (
+ // OffsetNewest tells e.g. the database to get the most current data
+ OffsetNewest int64 = math.MaxInt64
+ // OffsetOldest tells e.g. the database to get the oldest data
+ OffsetOldest int64 = 0
+)
// KeyTypePurposeToInt maps a purpose to an integer, which is used in the
// database to reduce the amount of space taken up by this column.
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 669957be..e1b84b80 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -50,7 +50,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
- js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
cfg, roomserverDB, js,
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 5d7937b5..77ad2b72 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -5,20 +5,17 @@ import (
"sync"
"time"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/setup/config"
"github.com/sirupsen/logrus"
- saramajs "github.com/S7evinK/saramajetstream"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
-func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func Prepare(cfg *config.JetStream) natsclient.JetStreamContext {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
@@ -52,20 +49,20 @@ func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sar
return setupNATS(cfg, nc)
}
-func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext {
if nc == nil {
var err error
- nc, err = nats.Connect(strings.Join(cfg.Addresses, ","))
+ nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
if err != nil {
logrus.WithError(err).Panic("Unable to connect to NATS")
- return nil, nil, nil
+ return nil
}
}
s, err := nc.JetStream()
if err != nil {
logrus.WithError(err).Panic("Unable to get JetStream context")
- return nil, nil, nil
+ return nil
}
for _, stream := range streams { // streams are defined in streams.go
@@ -80,7 +77,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
- stream.Storage = nats.MemoryStorage
+ stream.Storage = natsclient.MemoryStorage
}
// Namespace the streams without modifying the original streams
@@ -93,7 +90,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
}
- consumer := saramajs.NewJetStreamConsumer(nc, s, "")
- producer := saramajs.NewJetStreamProducer(nc, s, "")
- return s, consumer, producer
+ return s
}
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 97685cc0..e806f76e 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -18,84 +18,81 @@ import (
"context"
"encoding/json"
- "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
- ctx context.Context
- keyChangeConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *notifier.Notifier
- stream types.StreamProvider
- serverName gomatrixserverlib.ServerName // our server name
- rsAPI roomserverAPI.RoomserverInternalAPI
- keyAPI api.KeyInternalAPI
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ notifier *notifier.Notifier
+ stream types.StreamProvider
+ serverName gomatrixserverlib.ServerName // our server name
+ rsAPI roomserverAPI.RoomserverInternalAPI
+ keyAPI api.KeyInternalAPI
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
process *process.ProcessContext,
- serverName gomatrixserverlib.ServerName,
+ cfg *config.SyncAPI,
topic string,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputKeyChangeEventConsumer {
-
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "syncapi/keychange",
- Topic: topic,
- Consumer: kafkaConsumer,
- PartitionStore: store,
- }
-
s := &OutputKeyChangeEventConsumer{
- ctx: process.Context(),
- keyChangeConsumer: &consumer,
- db: store,
- serverName: serverName,
- keyAPI: keyAPI,
- rsAPI: rsAPI,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
+ topic: topic,
+ db: store,
+ serverName: cfg.Matrix.ServerName,
+ keyAPI: keyAPI,
+ rsAPI: rsAPI,
+ notifier: notifier,
+ stream: stream,
}
- consumer.ProcessMessage = s.onMessage
-
return s
}
// Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error {
- return s.keyChangeConsumer.Start()
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
-func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
@@ -107,9 +104,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
}
-func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) bool {
if m.DeviceKeys == nil {
- return nil
+ return true
}
output := m.DeviceKeys
// work out who we need to notify about the new key
@@ -120,7 +117,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -131,10 +128,10 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}
-func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) bool {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
@@ -144,7 +141,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -155,5 +152,5 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 41efd4a0..fa1064b7 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -18,8 +18,8 @@ import (
"context"
"strings"
- "github.com/Shopify/sarama"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ keytypes "github.com/matrix-org/dendrite/keyserver/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -64,8 +64,8 @@ func DeviceListCatchup(
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
- offset := sarama.OffsetOldest
- toOffset := sarama.OffsetNewest
+ offset := keytypes.OffsetOldest
+ toOffset := keytypes.OffsetNewest
if to > 0 && to > from {
toOffset = int64(to)
}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 9cff4cad..b464ad9c 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -19,7 +19,6 @@ import (
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -27,8 +26,6 @@ import (
)
type Database interface {
- internal.PartitionStorer
-
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 39bc233a..72462459 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -48,7 +48,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
- js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@@ -65,8 +65,8 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- consumer, keyAPI, rsAPI, syncDB, notifier,
+ process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {