diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-09-03 15:22:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-03 15:22:16 +0100 |
commit | 6150de6cb3611ffc61ce10ed6714f65e51e38e78 (patch) | |
tree | 7c89defb2634b19497f4911e9851f35d8b101af7 /roomserver | |
parent | 74743ac8ae3cc439862acd15d13ba4123d745598 (diff) |
FIFO ordering of input events (#1386)
* Initial FIFOing of roomserver inputs
* Remove EventID response from api.InputRoomEventsResponse
* Don't send back event ID unnecessarily
* Fix ordering hopefully
* Reduce copies, use buffered task channel to reduce contention on other rooms
* Fix error handling
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/input.go | 1 | ||||
-rw-r--r-- | roomserver/api/wrapper.go | 11 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 82 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 4 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 3 |
5 files changed, 81 insertions, 20 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 05c981df..73c4994a 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -83,5 +83,4 @@ type InputRoomEventsRequest struct { // InputRoomEventsResponse is a response to InputRoomEvents type InputRoomEventsResponse struct { - EventID string `json:"event_id"` } diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index 207c12c8..16f5e8e1 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -26,7 +26,7 @@ import ( func SendEvents( ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, -) (string, error) { +) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { ires[i] = InputRoomEvent{ @@ -77,19 +77,16 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - _, err = SendInputRoomEvents(ctx, rsAPI, ires) - return err + return SendInputRoomEvents(ctx, rsAPI, ires) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, -) (eventID string, err error) { +) error { request := InputRoomEventsRequest{InputRoomEvents: ires} var response InputRoomEventsResponse - err = rsAPI.InputRoomEvents(ctx, &request, &response) - eventID = response.EventID - return + return rsAPI.InputRoomEvents(ctx, &request, &response) } // SendInvite event to the roomserver. diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 87bdc5db..7a44ff42 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,12 +19,14 @@ import ( "context" "encoding/json" "sync" + "time" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type Inputer struct { @@ -33,7 +35,36 @@ type Inputer struct { ServerName gomatrixserverlib.ServerName OutputRoomEventTopic string - mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent + workers sync.Map // room ID -> *inputWorker +} + +type inputTask struct { + ctx context.Context + event *api.InputRoomEvent + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done +} + +type inputWorker struct { + r *Inputer + running atomic.Bool + input chan *inputTask +} + +func (w *inputWorker) start() { + if !w.running.CAS(false, true) { + return + } + defer w.running.Store(false) + for { + select { + case task := <-w.input: + _, task.err = w.r.processRoomEvent(task.ctx, task.event) + task.wg.Done() + case <-time.After(time.Second * 5): + return + } + } } // WriteOutputEvents implements OutputRoomEventWriter @@ -73,19 +104,54 @@ func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, -) (err error) { +) error { + // Create a wait group. Each task that we dispatch will call Done on + // this wait group so that we know when all of our events have been + // processed. + wg := &sync.WaitGroup{} + wg.Add(len(request.InputRoomEvents)) + tasks := make([]*inputTask, len(request.InputRoomEvents)) + for i, e := range request.InputRoomEvents { + // Work out if we are running per-room workers or if we're just doing + // it on a global basis (e.g. SQLite). roomID := "global" if r.DB.SupportsConcurrentRoomInputs() { roomID = e.Event.RoomID() } - mutex, _ := r.mutexes.LoadOrStore(roomID, &sync.Mutex{}) - mutex.(*sync.Mutex).Lock() - if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { - mutex.(*sync.Mutex).Unlock() - return err + + // Look up the worker, or create it if it doesn't exist. This channel + // is buffered to reduce the chance that we'll be blocked by another + // room - the channel will be quite small as it's just pointer types. + w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ + r: r, + input: make(chan *inputTask, 10), + }) + worker := w.(*inputWorker) + + // Create a task. This contains the input event and a reference to + // the wait group, so that the worker can notify us when this specific + // task has been finished. + tasks[i] = &inputTask{ + ctx: ctx, + event: &request.InputRoomEvents[i], + wg: wg, + } + + // Send the task to the worker. + go worker.start() + worker.input <- tasks[i] + } + + // Wait for all of the workers to return results about our tasks. + wg.Wait() + + // If any of the tasks returned an error, we should probably report + // that back to the caller. + for _, task := range tasks { + if task.err != nil { + return task.err } - mutex.(*sync.Mutex).Unlock() } return nil } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 69f51f4b..6ee679da 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -38,7 +38,7 @@ import ( // nolint:gocyclo func (r *Inputer) processRoomEvent( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON headered := input.Event @@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent( func (r *Inputer) calculateAndSetState( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, roomInfo types.RoomInfo, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 0deb7acb..786d4f31 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -114,8 +114,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}) hevents := mustLoadEvents(t, ver, events) - _, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil) - if err != nil { + if err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil { t.Errorf("failed to SendEvents: %s", err) } return rsAPI, dp, hevents |