aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-09-03 15:22:16 +0100
committerGitHub <noreply@github.com>2020-09-03 15:22:16 +0100
commit6150de6cb3611ffc61ce10ed6714f65e51e38e78 (patch)
tree7c89defb2634b19497f4911e9851f35d8b101af7 /roomserver/internal/input
parent74743ac8ae3cc439862acd15d13ba4123d745598 (diff)
FIFO ordering of input events (#1386)
* Initial FIFOing of roomserver inputs * Remove EventID response from api.InputRoomEventsResponse * Don't send back event ID unnecessarily * Fix ordering hopefully * Reduce copies, use buffered task channel to reduce contention on other rooms * Fix error handling
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r--roomserver/internal/input/input.go82
-rw-r--r--roomserver/internal/input/input_events.go4
2 files changed, 76 insertions, 10 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 87bdc5db..7a44ff42 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,12 +19,14 @@ import (
"context"
"encoding/json"
"sync"
+ "time"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
+ "go.uber.org/atomic"
)
type Inputer struct {
@@ -33,7 +35,36 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName
OutputRoomEventTopic string
- mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
+ workers sync.Map // room ID -> *inputWorker
+}
+
+type inputTask struct {
+ ctx context.Context
+ event *api.InputRoomEvent
+ wg *sync.WaitGroup
+ err error // written back by worker, only safe to read when all tasks are done
+}
+
+type inputWorker struct {
+ r *Inputer
+ running atomic.Bool
+ input chan *inputTask
+}
+
+func (w *inputWorker) start() {
+ if !w.running.CAS(false, true) {
+ return
+ }
+ defer w.running.Store(false)
+ for {
+ select {
+ case task := <-w.input:
+ _, task.err = w.r.processRoomEvent(task.ctx, task.event)
+ task.wg.Done()
+ case <-time.After(time.Second * 5):
+ return
+ }
+ }
}
// WriteOutputEvents implements OutputRoomEventWriter
@@ -73,19 +104,54 @@ func (r *Inputer) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
-) (err error) {
+) error {
+ // Create a wait group. Each task that we dispatch will call Done on
+ // this wait group so that we know when all of our events have been
+ // processed.
+ wg := &sync.WaitGroup{}
+ wg.Add(len(request.InputRoomEvents))
+ tasks := make([]*inputTask, len(request.InputRoomEvents))
+
for i, e := range request.InputRoomEvents {
+ // Work out if we are running per-room workers or if we're just doing
+ // it on a global basis (e.g. SQLite).
roomID := "global"
if r.DB.SupportsConcurrentRoomInputs() {
roomID = e.Event.RoomID()
}
- mutex, _ := r.mutexes.LoadOrStore(roomID, &sync.Mutex{})
- mutex.(*sync.Mutex).Lock()
- if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
- mutex.(*sync.Mutex).Unlock()
- return err
+
+ // Look up the worker, or create it if it doesn't exist. This channel
+ // is buffered to reduce the chance that we'll be blocked by another
+ // room - the channel will be quite small as it's just pointer types.
+ w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
+ r: r,
+ input: make(chan *inputTask, 10),
+ })
+ worker := w.(*inputWorker)
+
+ // Create a task. This contains the input event and a reference to
+ // the wait group, so that the worker can notify us when this specific
+ // task has been finished.
+ tasks[i] = &inputTask{
+ ctx: ctx,
+ event: &request.InputRoomEvents[i],
+ wg: wg,
+ }
+
+ // Send the task to the worker.
+ go worker.start()
+ worker.input <- tasks[i]
+ }
+
+ // Wait for all of the workers to return results about our tasks.
+ wg.Wait()
+
+ // If any of the tasks returned an error, we should probably report
+ // that back to the caller.
+ for _, task := range tasks {
+ if task.err != nil {
+ return task.err
}
- mutex.(*sync.Mutex).Unlock()
}
return nil
}
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 69f51f4b..6ee679da 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -38,7 +38,7 @@ import (
// nolint:gocyclo
func (r *Inputer) processRoomEvent(
ctx context.Context,
- input api.InputRoomEvent,
+ input *api.InputRoomEvent,
) (eventID string, err error) {
// Parse and validate the event JSON
headered := input.Event
@@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent(
func (r *Inputer) calculateAndSetState(
ctx context.Context,
- input api.InputRoomEvent,
+ input *api.InputRoomEvent,
roomInfo types.RoomInfo,
stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event,