aboutsummaryrefslogtreecommitdiff
path: root/federationsender/consumers
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-04-03 14:29:06 +0100
committerGitHub <noreply@github.com>2020-04-03 14:29:06 +0100
commit067b87506357c996fd6ddb11271db9469ad4ce80 (patch)
treee9127d78567b7676ba5ee607e9381ef4e0358911 /federationsender/consumers
parent955244c09298d0e6c870377dad3af2ffa1f5e578 (diff)
Invites v2 endpoint (#952)
* Start converting v1 invite endpoint to v2 * Update gomatrixserverlib * Early federationsender code for sending invites * Sending invites sorta happens now * Populate invite request with stripped state * Remodel a bit, don't reflect received invites * Handle invite_room_state * Handle room versions a bit better * Update gomatrixserverlib * Tweak order in destinationQueue.next * Revert check in processMessage * Tweak federation sender destination queue code a bit * Add comments
Diffstat (limited to 'federationsender/consumers')
-rw-r--r--federationsender/consumers/roomserver.go120
1 files changed, 102 insertions, 18 deletions
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 8ab2affe..f59405af 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -32,6 +32,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
+ cfg *config.Dendrite
roomServerConsumer *common.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
@@ -52,6 +53,7 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
+ cfg: cfg,
roomServerConsumer: &consumer,
db: store,
queues: queues,
@@ -79,29 +81,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
- if output.Type != api.OutputTypeNewRoomEvent {
+
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ ev := &output.NewRoomEvent.Event
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "room_id": ev.RoomID(),
+ "send_as_server": output.NewRoomEvent.SendAsServer,
+ }).Info("received room event from roomserver")
+
+ if err := s.processMessage(*output.NewRoomEvent); err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ "add": output.NewRoomEvent.AddsStateEventIDs,
+ "del": output.NewRoomEvent.RemovesStateEventIDs,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write room event failure")
+ return nil
+ }
+ case api.OutputTypeNewInviteEvent:
+ ev := &output.NewInviteEvent.Event
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "room_id": ev.RoomID(),
+ "state_key": ev.StateKey(),
+ }).Info("received invite event from roomserver")
+
+ if err := s.processInvite(*output.NewInviteEvent); err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write invite event failure")
+ return nil
+ }
+ default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
- ev := &output.NewRoomEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "send_as_server": output.NewRoomEvent.SendAsServer,
- }).Info("received event from roomserver")
-
- if err := s.processMessage(*output.NewRoomEvent); err != nil {
- // panic rather than continue with an inconsistent database
- log.WithFields(log.Fields{
- "event": string(ev.JSON()),
- log.ErrorKey: err,
- "add": output.NewRoomEvent.AddsStateEventIDs,
- "del": output.NewRoomEvent.RemovesStateEventIDs,
- }).Panicf("roomserver output log: write event failure")
- return nil
- }
return nil
}
@@ -159,6 +180,69 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
)
}
+// processInvite handles an invite event for sending over federation.
+func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
+ // Don't try to reflect and resend invites that didn't originate from us.
+ if s.cfg.Matrix.ServerName != oie.Event.Origin() {
+ return nil
+ }
+
+ // When sending a v2 invite, the inviting server should try and include
+ // a "stripped down" version of the room state. This is pretty much just
+ // enough information for the remote side to show something useful to the
+ // user, like the room name, aliases etc.
+ strippedState := []gomatrixserverlib.InviteV2StrippedState{}
+ stateWanted := []string{
+ gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
+ gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
+ }
+
+ // For each of the state keys that we want to try and send, ask the
+ // roomserver if we have a state event for that room that matches the
+ // state key.
+ for _, wanted := range stateWanted {
+ queryReq := api.QueryLatestEventsAndStateRequest{
+ RoomID: oie.Event.RoomID(),
+ StateToFetch: []gomatrixserverlib.StateKeyTuple{
+ gomatrixserverlib.StateKeyTuple{
+ EventType: wanted,
+ StateKey: "",
+ },
+ },
+ }
+ // If this fails then we just move onto the next event - we don't
+ // actually know at this point whether the room even has that type
+ // of state.
+ queryRes := api.QueryLatestEventsAndStateResponse{}
+ if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil {
+ log.WithFields(log.Fields{
+ "room_id": queryReq.RoomID,
+ "event_type": wanted,
+ }).WithError(err).Info("couldn't find state to strip")
+ continue
+ }
+ // Append the stripped down copy of the state to our list.
+ for _, headeredEvent := range queryRes.StateEvents {
+ event := headeredEvent.Unwrap()
+ strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event))
+
+ log.WithFields(log.Fields{
+ "room_id": queryReq.RoomID,
+ "event_type": event.Type(),
+ }).Info("adding stripped state")
+ }
+ }
+
+ // Build the invite request with the info we've got.
+ inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
+ }
+
+ // Send the event.
+ return s.queues.SendInvite(&inviteReq)
+}
+
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event.
// It is important to use the state at the event for sending messages because: