aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2017-08-18 11:33:10 +0100
committerGitHub <noreply@github.com>2017-08-18 11:33:10 +0100
commit877ea5cb62683e1476e72c383aa3e8450190d259 (patch)
tree77eb9652bd34dd6b176f191624d69ab023e1194b /src
parentba8b5d8bf950d3e384c67b30de22ae84bb604af1 (diff)
Remove StopProcessingAfter from the roomserver consumer as it is unused (#186)
Diffstat (limited to 'src')
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/input.go24
1 files changed, 0 insertions, 24 deletions
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go
index c8ac58d3..210abfa2 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go
@@ -17,9 +17,6 @@ package input
import (
"encoding/json"
- "fmt"
- "sync/atomic"
-
"net/http"
"github.com/matrix-org/dendrite/common"
@@ -35,15 +32,6 @@ type RoomserverInputAPI struct {
// The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string
- // If non-nil then the API will stop processing messages after this
- // many messages and will shutdown. Malformed messages are not in the count.
- StopProcessingAfter *int64
- // If not-nil then the API will call this to shutdown the server.
- // If this is nil then the API will continue to process messsages even
- // though StopProcessingAfter has been reached.
- ShutdownCallback func(reason string)
- // How many messages the consumer has processed.
- processed int64
}
// WriteOutputEvents implements OutputRoomEventWriter
@@ -72,18 +60,6 @@ func (r *RoomserverInputAPI) InputRoomEvents(
if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil {
return err
}
- // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
- processed := atomic.AddInt64(&r.processed, 1)
- // Check if we should stop processing.
- // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
- // If we try to stop processing after M message and we have N goroutines then we will process somewhere
- // between M and (N + M) messages because the N goroutines could all try to process what they think will be the
- // last message. We could be more careful here but this is good enough for getting rough benchmarks.
- if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) {
- if r.ShutdownCallback != nil {
- r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed))
- }
- }
}
return nil
}