diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-09-03 15:22:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-03 15:22:16 +0100 |
commit | 6150de6cb3611ffc61ce10ed6714f65e51e38e78 (patch) | |
tree | 7c89defb2634b19497f4911e9851f35d8b101af7 /roomserver/internal/input | |
parent | 74743ac8ae3cc439862acd15d13ba4123d745598 (diff) |
FIFO ordering of input events (#1386)
* Initial FIFOing of roomserver inputs
* Remove EventID response from api.InputRoomEventsResponse
* Don't send back event ID unnecessarily
* Fix ordering hopefully
* Reduce copies, use buffered task channel to reduce contention on other rooms
* Fix error handling
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r-- | roomserver/internal/input/input.go | 82 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 4 |
2 files changed, 76 insertions, 10 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 87bdc5db..7a44ff42 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,12 +19,14 @@ import ( "context" "encoding/json" "sync" + "time" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Inputer struct { @@ -33,7 +35,36 @@ type Inputer struct { ServerName gomatrixserverlib.ServerName OutputRoomEventTopic string - mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent + workers sync.Map // room ID -> *inputWorker +} + +type inputTask struct { + ctx context.Context + event *api.InputRoomEvent + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done +} + +type inputWorker struct { + r *Inputer + running atomic.Bool + input chan *inputTask +} + +func (w *inputWorker) start() { + if !w.running.CAS(false, true) { + return + } + defer w.running.Store(false) + for { + select { + case task := <-w.input: + _, task.err = w.r.processRoomEvent(task.ctx, task.event) + task.wg.Done() + case <-time.After(time.Second * 5): + return + } + } } // WriteOutputEvents implements OutputRoomEventWriter @@ -73,19 +104,54 @@ func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, -) (err error) { +) error { + // Create a wait group. Each task that we dispatch will call Done on + // this wait group so that we know when all of our events have been + // processed. + wg := &sync.WaitGroup{} + wg.Add(len(request.InputRoomEvents)) + tasks := make([]*inputTask, len(request.InputRoomEvents)) + for i, e := range request.InputRoomEvents { + // Work out if we are running per-room workers or if we're just doing + // it on a global basis (e.g. SQLite). roomID := "global" if r.DB.SupportsConcurrentRoomInputs() { roomID = e.Event.RoomID() } - mutex, _ := r.mutexes.LoadOrStore(roomID, &sync.Mutex{}) - mutex.(*sync.Mutex).Lock() - if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { - mutex.(*sync.Mutex).Unlock() - return err + + // Look up the worker, or create it if it doesn't exist. This channel + // is buffered to reduce the chance that we'll be blocked by another + // room - the channel will be quite small as it's just pointer types. + w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ + r: r, + input: make(chan *inputTask, 10), + }) + worker := w.(*inputWorker) + + // Create a task. This contains the input event and a reference to + // the wait group, so that the worker can notify us when this specific + // task has been finished. + tasks[i] = &inputTask{ + ctx: ctx, + event: &request.InputRoomEvents[i], + wg: wg, + } + + // Send the task to the worker. + go worker.start() + worker.input <- tasks[i] + } + + // Wait for all of the workers to return results about our tasks. + wg.Wait() + + // If any of the tasks returned an error, we should probably report + // that back to the caller. + for _, task := range tasks { + if task.err != nil { + return task.err } - mutex.(*sync.Mutex).Unlock() } return nil } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 69f51f4b..6ee679da 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -38,7 +38,7 @@ import ( // nolint:gocyclo func (r *Inputer) processRoomEvent( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON headered := input.Event @@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent( func (r *Inputer) calculateAndSetState( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, roomInfo types.RoomInfo, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, |