diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-04-03 14:29:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-03 14:29:06 +0100 |
commit | 067b87506357c996fd6ddb11271db9469ad4ce80 (patch) | |
tree | e9127d78567b7676ba5ee607e9381ef4e0358911 /federationsender/consumers | |
parent | 955244c09298d0e6c870377dad3af2ffa1f5e578 (diff) |
Invites v2 endpoint (#952)
* Start converting v1 invite endpoint to v2
* Update gomatrixserverlib
* Early federationsender code for sending invites
* Sending invites sorta happens now
* Populate invite request with stripped state
* Remodel a bit, don't reflect received invites
* Handle invite_room_state
* Handle room versions a bit better
* Update gomatrixserverlib
* Tweak order in destinationQueue.next
* Revert check in processMessage
* Tweak federation sender destination queue code a bit
* Add comments
Diffstat (limited to 'federationsender/consumers')
-rw-r--r-- | federationsender/consumers/roomserver.go | 120 |
1 files changed, 102 insertions, 18 deletions
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 8ab2affe..f59405af 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -32,6 +32,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { + cfg *config.Dendrite roomServerConsumer *common.ContinualConsumer db storage.Database queues *queue.OutgoingQueues @@ -52,6 +53,7 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ + cfg: cfg, roomServerConsumer: &consumer, db: store, queues: queues, @@ -79,29 +81,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } - if output.Type != api.OutputTypeNewRoomEvent { + + switch output.Type { + case api.OutputTypeNewRoomEvent: + ev := &output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "send_as_server": output.NewRoomEvent.SendAsServer, + }).Info("received room event from roomserver") + + if err := s.processMessage(*output.NewRoomEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, + log.ErrorKey: err, + }).Panicf("roomserver output log: write room event failure") + return nil + } + case api.OutputTypeNewInviteEvent: + ev := &output.NewInviteEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "state_key": ev.StateKey(), + }).Info("received invite event from roomserver") + + if err := s.processInvite(*output.NewInviteEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite event failure") + return nil + } + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) return nil } - ev := &output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "send_as_server": output.NewRoomEvent.SendAsServer, - }).Info("received event from roomserver") - - if err := s.processMessage(*output.NewRoomEvent); err != nil { - // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") - return nil - } return nil } @@ -159,6 +180,69 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err ) } +// processInvite handles an invite event for sending over federation. +func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error { + // Don't try to reflect and resend invites that didn't originate from us. + if s.cfg.Matrix.ServerName != oie.Event.Origin() { + return nil + } + + // When sending a v2 invite, the inviting server should try and include + // a "stripped down" version of the room state. This is pretty much just + // enough information for the remote side to show something useful to the + // user, like the room name, aliases etc. + strippedState := []gomatrixserverlib.InviteV2StrippedState{} + stateWanted := []string{ + gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, + gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, + } + + // For each of the state keys that we want to try and send, ask the + // roomserver if we have a state event for that room that matches the + // state key. + for _, wanted := range stateWanted { + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: oie.Event.RoomID(), + StateToFetch: []gomatrixserverlib.StateKeyTuple{ + gomatrixserverlib.StateKeyTuple{ + EventType: wanted, + StateKey: "", + }, + }, + } + // If this fails then we just move onto the next event - we don't + // actually know at this point whether the room even has that type + // of state. + queryRes := api.QueryLatestEventsAndStateResponse{} + if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil { + log.WithFields(log.Fields{ + "room_id": queryReq.RoomID, + "event_type": wanted, + }).WithError(err).Info("couldn't find state to strip") + continue + } + // Append the stripped down copy of the state to our list. + for _, headeredEvent := range queryRes.StateEvents { + event := headeredEvent.Unwrap() + strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event)) + + log.WithFields(log.Fields{ + "room_id": queryReq.RoomID, + "event_type": event.Type(), + }).Info("adding stripped state") + } + } + + // Build the invite request with the info we've got. + inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState) + if err != nil { + return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err) + } + + // Send the event. + return s.queues.SendInvite(&inviteReq) +} + // joinedHostsAtEvent works out a list of matrix servers that were joined to // the room at the event. // It is important to use the state at the event for sending messages because: |