aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-04 14:08:13 +0100
committerGitHub <noreply@github.com>2022-02-04 13:08:13 +0000
commit9de7efa0b095f40457f0e348632c77326dcb4a42 (patch)
treec543823b7d4ce25371b975a88bc5270c3a9d7352 /internal
parent532f445c4e31396fc3aa4f52e0e078cd499bc39a (diff)
Remove sarama/saramajetstream dependencies (#2138)
* Remove dependency on saramajetstream & sarama Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Remove internal.ContinualConsumer from federationapi * Remove internal.ContinualConsumer from syncapi * Remove internal.ContinualConsumer from keyserver * Move to new Prepare function * Remove saramajetstream & sarama dependency * Delete unneeded file * Remove duplicate import * Log error instead of silently irgnoring it * Move `OffsetNewest` and `OffsetOldest` into keyserver types, change them to be more sane values * Fix comments Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/consumers.go139
1 files changed, 0 insertions, 139 deletions
diff --git a/internal/consumers.go b/internal/consumers.go
deleted file mode 100644
index 3a4e0b7f..00000000
--- a/internal/consumers.go
+++ /dev/null
@@ -1,139 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package internal
-
-import (
- "context"
- "fmt"
-
- "github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/sirupsen/logrus"
-)
-
-// A PartitionStorer has the storage APIs needed by the consumer.
-type PartitionStorer interface {
- // PartitionOffsets returns the offsets the consumer has reached for each partition.
- PartitionOffsets(ctx context.Context, topic string) ([]sqlutil.PartitionOffset, error)
- // SetPartitionOffset records where the consumer has reached for a partition.
- SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error
-}
-
-// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
-// remember the offset it reached.
-type ContinualConsumer struct {
- // The parent context for the listener, stop consuming when this context is done
- Process *process.ProcessContext
- // The component name
- ComponentName string
- // The kafkaesque topic to consume events from.
- // This is the name used in kafka to identify the stream to consume events from.
- Topic string
- // A kafkaesque stream consumer providing the APIs for talking to the event source.
- // The interface is taken from a client library for Apache Kafka.
- // But any equivalent event streaming protocol could be made to implement the same interface.
- Consumer sarama.Consumer
- // A thing which can load and save partition offsets for a topic.
- PartitionStore PartitionStorer
- // ProcessMessage is a function which will be called for each message in the log. Return an error to
- // stop processing messages. See ErrShutdown for specific control signals.
- ProcessMessage func(msg *sarama.ConsumerMessage) error
- // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
- // It is optional.
- ShutdownCallback func()
-}
-
-// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
-var ErrShutdown = fmt.Errorf("shutdown")
-
-// Start starts the consumer consuming.
-// Starts up a goroutine for each partition in the kafka stream.
-// Returns nil once all the goroutines are started.
-// Returns an error if it can't start consuming for any of the partitions.
-func (c *ContinualConsumer) Start() error {
- _, err := c.StartOffsets()
- return err
-}
-
-// StartOffsets is the same as Start but returns the loaded offsets as well.
-func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
- offsets := map[int32]int64{}
-
- partitions, err := c.Consumer.Partitions(c.Topic)
- if err != nil {
- return nil, err
- }
- for _, partition := range partitions {
- // Default all the offsets to the beginning of the stream.
- offsets[partition] = sarama.OffsetOldest
- }
-
- storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
- if err != nil {
- return nil, err
- }
- for _, offset := range storedOffsets {
- // We've already processed events from this partition so advance the offset to where we got to.
- // ConsumePartition will start streaming from the message with the given offset (inclusive),
- // so increment 1 to avoid getting the same message a second time.
- offsets[offset.Partition] = 1 + offset.Offset
- }
-
- var partitionConsumers []sarama.PartitionConsumer
- for partition, offset := range offsets {
- pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
- if err != nil {
- for _, p := range partitionConsumers {
- p.Close() // nolint: errcheck
- }
- return nil, err
- }
- partitionConsumers = append(partitionConsumers, pc)
- }
- for _, pc := range partitionConsumers {
- go c.consumePartition(pc)
- if c.Process != nil {
- c.Process.ComponentStarted()
- go func(pc sarama.PartitionConsumer) {
- <-c.Process.WaitForShutdown()
- _ = pc.Close()
- c.Process.ComponentFinished()
- logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic)
- }(pc)
- }
- }
-
- return storedOffsets, nil
-}
-
-// consumePartition consumes the room events for a single partition of the kafkaesque stream.
-func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
- defer pc.Close() // nolint: errcheck
- for message := range pc.Messages() {
- msgErr := c.ProcessMessage(message)
- // Advance our position in the stream so that we will start at the right position after a restart.
- if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
- panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
- }
- // Shutdown if we were told to do so.
- if msgErr == ErrShutdown {
- if c.ShutdownCallback != nil {
- c.ShutdownCallback()
- }
- return
- }
- }
-}