diff options
Diffstat (limited to 'roomserver/internal/perform')
-rw-r--r-- | roomserver/internal/perform/perform_backfill.go | 547 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_invite.go | 243 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_join.go | 308 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_leave.go | 179 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_publish.go | 25 |
5 files changed, 1302 insertions, 0 deletions
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go new file mode 100644 index 00000000..ebb66ef4 --- /dev/null +++ b/roomserver/internal/perform/perform_backfill.go @@ -0,0 +1,547 @@ +package perform + +import ( + "context" + "fmt" + + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/auth" + "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +type Backfiller struct { + ServerName gomatrixserverlib.ServerName + DB storage.Database + FedClient *gomatrixserverlib.FederationClient + KeyRing gomatrixserverlib.JSONVerifier +} + +// PerformBackfill implements api.RoomServerQueryAPI +func (r *Backfiller) PerformBackfill( + ctx context.Context, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, +) error { + // if we are requesting the backfill then we need to do a federation hit + // TODO: we could be more sensible and fetch as many events we already have then request the rest + // which is what the syncapi does already. + if request.ServerName == r.ServerName { + return r.backfillViaFederation(ctx, request, response) + } + // someone else is requesting the backfill, try to service their request. + var err error + var front []string + + // The limit defines the maximum number of events to retrieve, so it also + // defines the highest number of elements in the map below. + visited := make(map[string]bool, request.Limit) + + // this will include these events which is what we want + front = request.PrevEventIDs() + + info, err := r.DB.RoomInfo(ctx, request.RoomID) + if err != nil { + return err + } + if info == nil || info.IsStub { + return fmt.Errorf("PerformBackfill: missing room info for room %s", request.RoomID) + } + + // Scan the event tree for events to send back. + resultNIDs, err := helpers.ScanEventTree(ctx, r.DB, *info, front, visited, request.Limit, request.ServerName) + if err != nil { + return err + } + + // Retrieve events from the list that was filled previously. + var loadedEvents []gomatrixserverlib.Event + loadedEvents, err = helpers.LoadEvents(ctx, r.DB, resultNIDs) + if err != nil { + return err + } + + for _, event := range loadedEvents { + response.Events = append(response.Events, event.Headered(info.RoomVersion)) + } + + return err +} + +func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error { + info, err := r.DB.RoomInfo(ctx, req.RoomID) + if err != nil { + return err + } + if info == nil || info.IsStub { + return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) + } + requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities) + // Request 100 items regardless of what the query asks for. + // We don't want to go much higher than this. + // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass + // (so we don't need to hit /state_ids which the test has no listener for) + // Specifically the test "Outbound federation can backfill events" + events, err := gomatrixserverlib.RequestBackfill( + ctx, requester, + r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100) + if err != nil { + return err + } + logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events)) + + // persist these new events - auth checks have already been done + roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events) + if err != nil { + return err + } + + for _, ev := range backfilledEventMap { + // now add state for these events + stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()] + if !ok { + // this should be impossible as all events returned must have pass Step 5 of the PDU checks + // which requires a list of state IDs. + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to find state IDs for event which passed auth checks") + continue + } + var entries []types.StateEntry + if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { + // attempt to fetch the missing events + r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs) + // try again + entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event") + return err + } + } + + var beforeStateSnapshotNID types.StateSnapshotNID + if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid") + return err + } + if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist snapshot nid") + } + } + + // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. + + res.Events = events + return nil +} + +// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just +// best effort. +func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, + backfillRequester *backfillRequester, stateIDs []string) { + + servers := backfillRequester.servers + + // work out which are missing + nidMap, err := r.DB.EventNIDs(ctx, stateIDs) + if err != nil { + util.GetLogger(ctx).WithError(err).Warn("cannot query missing events") + return + } + missingMap := make(map[string]*gomatrixserverlib.HeaderedEvent) // id -> event + for _, id := range stateIDs { + if _, ok := nidMap[id]; !ok { + missingMap[id] = nil + } + } + util.GetLogger(ctx).Infof("Fetching %d missing state events (from %d possible servers)", len(missingMap), len(servers)) + + // fetch the events from federation. Loop the servers first so if we find one that works we stick with them + for _, srv := range servers { + for id, ev := range missingMap { + if ev != nil { + continue // already found + } + logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id) + res, err := r.FedClient.GetEvent(ctx, srv, id) + if err != nil { + logger.WithError(err).Warn("failed to get event from server") + continue + } + loader := gomatrixserverlib.NewEventsLoader(roomVer, r.KeyRing, backfillRequester, backfillRequester.ProvideEvents, false) + result, err := loader.LoadAndVerify(ctx, res.PDUs, gomatrixserverlib.TopologicalOrderByPrevEvents) + if err != nil { + logger.WithError(err).Warn("failed to load and verify event") + continue + } + logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) + for _, res := range result { + if res.Error != nil { + logger.WithError(err).Warn("event failed PDU checks") + continue + } + missingMap[id] = res.Event + } + } + } + + var newEvents []gomatrixserverlib.HeaderedEvent + for _, ev := range missingMap { + if ev != nil { + newEvents = append(newEvents, *ev) + } + } + util.GetLogger(ctx).Infof("Persisting %d new events", len(newEvents)) + persistEvents(ctx, r.DB, newEvents) +} + +// backfillRequester implements gomatrixserverlib.BackfillRequester +type backfillRequester struct { + db storage.Database + fedClient *gomatrixserverlib.FederationClient + thisServer gomatrixserverlib.ServerName + bwExtrems map[string][]string + + // per-request state + servers []gomatrixserverlib.ServerName + eventIDToBeforeStateIDs map[string][]string + eventIDMap map[string]gomatrixserverlib.Event +} + +func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { + return &backfillRequester{ + db: db, + fedClient: fedClient, + thisServer: thisServer, + eventIDToBeforeStateIDs: make(map[string][]string), + eventIDMap: make(map[string]gomatrixserverlib.Event), + bwExtrems: bwExtrems, + } +} + +func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) { + b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap() + if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok { + return ids, nil + } + if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") { + util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room") + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{} + return nil, nil + } + // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event. + // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or + // we don't know the result of state res to merge forks (2 or more prev_events) + if len(targetEvent.PrevEventIDs()) == 1 { + prevEventID := targetEvent.PrevEventIDs()[0] + prevEvent, ok := b.eventIDMap[prevEventID] + if !ok { + goto FederationHit + } + prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID] + if !ok { + goto FederationHit + } + newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs) + if newStateIDs != nil { + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs, nil + } + // else we failed to calculate the new state, so fallthrough + } + +FederationHit: + var lastErr error + logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") + for _, srv := range b.servers { // hit any valid server + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fedClient, + RememberAuthEvents: false, + Server: srv, + } + res, err := c.StateIDsBeforeEvent(ctx, targetEvent) + if err != nil { + lastErr = err + continue + } + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res + return res, nil + } + return nil, lastErr +} + +func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string { + newStateIDs := prevEventStateIDs[:] + if prevEvent.StateKey() == nil { + // state is the same as the previous event + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs + } + + missingState := false // true if we are missing the info for a state event ID + foundEvent := false // true if we found a (type, state_key) match + // find which state ID to replace, if any + for i, id := range newStateIDs { + ev, ok := b.eventIDMap[id] + if !ok { + missingState = true + continue + } + // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself + if ev.Type() == prevEvent.Type() && ev.StateKeyEquals(*prevEvent.StateKey()) { + newStateIDs[i] = prevEvent.EventID() + foundEvent = true + break + } + } + if !foundEvent && !missingState { + // we can be certain that this is new state + newStateIDs = append(newStateIDs, prevEvent.EventID()) + foundEvent = true + } + + if foundEvent { + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs + } + return nil +} + +func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, + event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { + + // try to fetch the events from the database first + events, err := b.ProvideEvents(roomVer, eventIDs) + if err != nil { + // non-fatal, fallthrough + logrus.WithError(err).Info("Failed to fetch events") + } else { + logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs)) + if len(events) == len(eventIDs) { + result := make(map[string]*gomatrixserverlib.Event) + for i := range events { + result[events[i].EventID()] = &events[i] + b.eventIDMap[events[i].EventID()] = events[i] + } + return result, nil + } + } + + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fedClient, + RememberAuthEvents: false, + Server: b.servers[0], + } + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + return nil, err + } + for eventID, ev := range result { + b.eventIDMap[eventID] = *ev + } + return result, nil +} + +// ServersAtEvent is called when trying to determine which server to request from. +// It returns a list of servers which can be queried for backfill requests. These servers +// will be servers that are in the room already. The entries at the beginning are preferred servers +// and will be tried first. An empty list will fail the request. +// nolint:gocyclo +func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { + // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use + // its successor, so look it up. + successor := "" +FindSuccessor: + for sucID, prevEventIDs := range b.bwExtrems { + for _, pe := range prevEventIDs { + if pe == eventID { + successor = sucID + break FindSuccessor + } + } + } + if successor == "" { + logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state") + return nil + } + eventID = successor + + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for + // the event is necessary. + NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") + return nil + } + + info, err := b.db.RoomInfo(ctx, roomID) + if err != nil { + logrus.WithError(err).WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room") + return nil + } + if info == nil || info.IsStub { + logrus.WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room, room is missing") + return nil + } + + stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, *info, NIDs[eventID]) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") + return nil + } + + // possibly return all joined servers depending on history visiblity + memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries) + if err != nil { + logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") + return nil + } + logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis)) + + // Retrieve all "m.room.member" state events of "join" membership, which + // contains the list of users in the room before the event, therefore all + // the servers in it at that moment. + memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, stateEntries, true) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") + return nil + } + memberEvents = append(memberEvents, memberEventsFromVis...) + + // Store the server names in a temporary map to avoid duplicates. + serverSet := make(map[gomatrixserverlib.ServerName]bool) + for _, event := range memberEvents { + serverSet[event.Origin()] = true + } + var servers []gomatrixserverlib.ServerName + for server := range serverSet { + if server == b.thisServer { + continue + } + servers = append(servers, server) + } + b.servers = servers + return servers +} + +// Backfill performs a backfill request to the given server. +// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid +func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, + fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { + + tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) + return &tx, err +} + +func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { + ctx := context.Background() + nidMap, err := b.db.EventNIDs(ctx, eventIDs) + if err != nil { + logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events") + return nil, err + } + eventNIDs := make([]types.EventNID, len(nidMap)) + i := 0 + for _, nid := range nidMap { + eventNIDs[i] = nid + i++ + } + eventsWithNids, err := b.db.Events(ctx, eventNIDs) + if err != nil { + logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events") + return nil, err + } + events := make([]gomatrixserverlib.Event, len(eventsWithNids)) + for i := range eventsWithNids { + events[i] = eventsWithNids[i].Event + } + return events, nil +} + +// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if the provided state indicated a 'shared' history visibility. +// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just +// pull all events and then filter by that table. +func joinEventsFromHistoryVisibility( + ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry) ([]types.Event, error) { + + var eventNIDs []types.EventNID + for _, entry := range stateEntries { + // Filter the events to retrieve to only keep the membership events + if entry.EventTypeNID == types.MRoomHistoryVisibilityNID && entry.EventStateKeyNID == types.EmptyStateKeyNID { + eventNIDs = append(eventNIDs, entry.EventNID) + break + } + } + + // Get all of the events in this state + stateEvents, err := db.Events(ctx, eventNIDs) + if err != nil { + return nil, err + } + events := make([]gomatrixserverlib.Event, len(stateEvents)) + for i := range stateEvents { + events[i] = stateEvents[i].Event + } + visibility := auth.HistoryVisibilityForRoom(events) + if visibility != "shared" { + logrus.Infof("ServersAtEvent history visibility not shared: %s", visibility) + return nil, nil + } + // get joined members + info, err := db.RoomInfo(ctx, roomID) + if err != nil { + return nil, err + } + joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) + if err != nil { + return nil, err + } + return db.Events(ctx, joinEventNIDs) +} + +func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) { + var roomNID types.RoomNID + backfilledEventMap := make(map[string]types.Event) + for j, ev := range events { + nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs()) + if err != nil { // this shouldn't happen as RequestBackfill already found them + logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events") + continue + } + authNids := make([]types.EventNID, len(nidMap)) + i := 0 + for _, nid := range nidMap { + authNids[i] = nid + i++ + } + var stateAtEvent types.StateAtEvent + var redactedEventID string + var redactionEvent *gomatrixserverlib.Event + roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") + continue + } + // If storing this event results in it being redacted, then do so. + // It's also possible for this event to be a redaction which results in another event being + // redacted, which we don't care about since we aren't returning it in this backfill. + if redactedEventID == ev.EventID() { + eventToRedact := ev.Unwrap() + redactedEvent, err := eventutil.RedactEvent(redactionEvent, &eventToRedact) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event") + continue + } + ev = redactedEvent.Headered(ev.RoomVersion) + events[j] = ev + } + backfilledEventMap[ev.EventID()] = types.Event{ + EventNID: stateAtEvent.StateEntry.EventNID, + Event: ev.Unwrap(), + } + } + return roomNID, backfilledEventMap +} diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go new file mode 100644 index 00000000..7320388e --- /dev/null +++ b/roomserver/internal/perform/perform_invite.go @@ -0,0 +1,243 @@ +package perform + +import ( + "context" + "fmt" + + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +type Inviter struct { + DB storage.Database + Cfg *config.RoomServer + FSAPI federationSenderAPI.FederationSenderInternalAPI + + // TODO FIXME: Remove this + RSAPI api.RoomserverInternalAPI +} + +// nolint:gocyclo +func (r *Inviter) PerformInvite( + ctx context.Context, + req *api.PerformInviteRequest, + res *api.PerformInviteResponse, +) ([]api.OutputEvent, error) { + event := req.Event + if event.StateKey() == nil { + return nil, fmt.Errorf("invite must be a state event") + } + + roomID := event.RoomID() + targetUserID := *event.StateKey() + info, err := r.DB.RoomInfo(ctx, roomID) + if err != nil { + return nil, fmt.Errorf("Failed to load RoomInfo: %w", err) + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "room_id": roomID, + "room_version": req.RoomVersion, + "target_user_id": targetUserID, + "room_info_exists": info != nil, + }).Info("processing invite event") + + _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) + isTargetLocal := domain == r.Cfg.Matrix.ServerName + isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName + + inviteState := req.InviteRoomState + if len(inviteState) == 0 && info != nil { + var is []gomatrixserverlib.InviteV2StrippedState + if is, err = buildInviteStrippedState(ctx, r.DB, info, req); err == nil { + inviteState = is + } + } + if len(inviteState) == 0 { + if err = event.SetUnsignedField("invite_room_state", struct{}{}); err != nil { + return nil, fmt.Errorf("event.SetUnsignedField: %w", err) + } + } else { + if err = event.SetUnsignedField("invite_room_state", inviteState); err != nil { + return nil, fmt.Errorf("event.SetUnsignedField: %w", err) + } + } + + var isAlreadyJoined bool + if info != nil { + _, isAlreadyJoined, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey()) + if err != nil { + return nil, fmt.Errorf("r.DB.GetMembership: %w", err) + } + } + if isAlreadyJoined { + // If the user is joined to the room then that takes precedence over this + // invite event. It makes little sense to move a user that is already + // joined to the room into the invite state. + // This could plausibly happen if an invite request raced with a join + // request for a user. For example if a user was invited to a public + // room and they joined the room at the same time as the invite was sent. + // The other way this could plausibly happen is if an invite raced with + // a kick. For example if a user was kicked from a room in error and in + // response someone else in the room re-invited them then it is possible + // for the invite request to race with the leave event so that the + // target receives invite before it learns that it has been kicked. + // There are a few ways this could be plausibly handled in the roomserver. + // 1) Store the invite, but mark it as retired. That will result in the + // permanent rejection of that invite event. So even if the target + // user leaves the room and the invite is retransmitted it will be + // ignored. However a new invite with a new event ID would still be + // accepted. + // 2) Silently discard the invite event. This means that if the event + // was retransmitted at a later date after the target user had left + // the room we would accept the invite. However since we hadn't told + // the sending server that the invite had been discarded it would + // have no reason to attempt to retry. + // 3) Signal the sending server that the user is already joined to the + // room. + // For now we will implement option 2. Since in the abesence of a retry + // mechanism it will be equivalent to option 1, and we don't have a + // signalling mechanism to implement option 3. + res.Error = &api.PerformError{ + Code: api.PerformErrorNotAllowed, + Msg: "User is already joined to room", + } + return nil, nil + } + + if isOriginLocal { + // The invite originated locally. Therefore we have a responsibility to + // try and see if the user is allowed to make this invite. We can't do + // this for invites coming in over federation - we have to take those on + // trust. + _, err = helpers.CheckAuthEvents(ctx, r.DB, event, event.AuthEventIDs()) + if err != nil { + log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error( + "processInviteEvent.checkAuthEvents failed for event", + ) + if _, ok := err.(*gomatrixserverlib.NotAllowed); ok { + res.Error = &api.PerformError{ + Msg: err.Error(), + Code: api.PerformErrorNotAllowed, + } + return nil, nil + } + return nil, fmt.Errorf("checkAuthEvents: %w", err) + } + + // If the invite originated from us and the target isn't local then we + // should try and send the invite over federation first. It might be + // that the remote user doesn't exist, in which case we can give up + // processing here. + if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal { + fsReq := &federationSenderAPI.PerformInviteRequest{ + RoomVersion: req.RoomVersion, + Event: event, + InviteRoomState: inviteState, + } + fsRes := &federationSenderAPI.PerformInviteResponse{} + if err = r.FSAPI.PerformInvite(ctx, fsReq, fsRes); err != nil { + res.Error = &api.PerformError{ + Msg: err.Error(), + Code: api.PerformErrorNoOperation, + } + log.WithError(err).WithField("event_id", event.EventID()).Error("r.FSAPI.PerformInvite failed") + return nil, nil + } + event = fsRes.Event + } + + // Send the invite event to the roomserver input stream. This will + // notify existing users in the room about the invite, update the + // membership table and ensure that the event is ready and available + // to use as an auth event when accepting the invite. + inputReq := &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: req.SendAsServer, + }, + }, + } + inputRes := &api.InputRoomEventsResponse{} + if err = r.RSAPI.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil { + return nil, fmt.Errorf("r.InputRoomEvents: %w", err) + } + } else { + // The invite originated over federation. Process the membership + // update, which will notify the sync API etc about the incoming + // invite. + updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) + if err != nil { + return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err) + } + + unwrapped := event.Unwrap() + outputUpdates, err := helpers.UpdateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion) + if err != nil { + return nil, fmt.Errorf("updateToInviteMembership: %w", err) + } + + if err = updater.Commit(); err != nil { + return nil, fmt.Errorf("updater.Commit: %w", err) + } + + return outputUpdates, nil + } + + return nil, nil +} + +func buildInviteStrippedState( + ctx context.Context, + db storage.Database, + info *types.RoomInfo, + input *api.PerformInviteRequest, +) ([]gomatrixserverlib.InviteV2StrippedState, error) { + stateWanted := []gomatrixserverlib.StateKeyTuple{} + // "If they are set on the room, at least the state for m.room.avatar, m.room.canonical_alias, m.room.join_rules, and m.room.name SHOULD be included." + // https://matrix.org/docs/spec/client_server/r0.6.0#m-room-member + for _, t := range []string{ + gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, + gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, + "m.room.avatar", "m.room.encryption", + } { + stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{ + EventType: t, + StateKey: "", + }) + } + roomState := state.NewStateResolution(db, *info) + stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples( + ctx, info.StateSnapshotNID, stateWanted, + ) + if err != nil { + return nil, err + } + stateNIDs := []types.EventNID{} + for _, stateNID := range stateEntries { + stateNIDs = append(stateNIDs, stateNID.EventNID) + } + stateEvents, err := db.Events(ctx, stateNIDs) + if err != nil { + return nil, err + } + inviteState := []gomatrixserverlib.InviteV2StrippedState{ + gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event), + } + stateEvents = append(stateEvents, types.Event{Event: input.Event.Unwrap()}) + for _, event := range stateEvents { + inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event)) + } + return inviteState, nil +} diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go new file mode 100644 index 00000000..c8e6e8e6 --- /dev/null +++ b/roomserver/internal/perform/perform_join.go @@ -0,0 +1,308 @@ +package perform + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + fsAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +type Joiner struct { + ServerName gomatrixserverlib.ServerName + Cfg *config.RoomServer + FSAPI fsAPI.FederationSenderInternalAPI + DB storage.Database + + // TODO FIXME: Remove this + RSAPI api.RoomserverInternalAPI +} + +// PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender. +func (r *Joiner) PerformJoin( + ctx context.Context, + req *api.PerformJoinRequest, + res *api.PerformJoinResponse, +) { + roomID, err := r.performJoin(ctx, req) + if err != nil { + perr, ok := err.(*api.PerformError) + if ok { + res.Error = perr + } else { + res.Error = &api.PerformError{ + Msg: err.Error(), + } + } + } + res.RoomID = roomID +} + +func (r *Joiner) performJoin( + ctx context.Context, + req *api.PerformJoinRequest, +) (string, error) { + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID), + } + } + if domain != r.Cfg.Matrix.ServerName { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID), + } + } + if strings.HasPrefix(req.RoomIDOrAlias, "!") { + return r.performJoinRoomByID(ctx, req) + } + if strings.HasPrefix(req.RoomIDOrAlias, "#") { + return r.performJoinRoomByAlias(ctx, req) + } + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias), + } +} + +func (r *Joiner) performJoinRoomByAlias( + ctx context.Context, + req *api.PerformJoinRequest, +) (string, error) { + // Get the domain part of the room alias. + _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias) + if err != nil { + return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias) + } + req.ServerNames = append(req.ServerNames, domain) + + // Check if this alias matches our own server configuration. If it + // doesn't then we'll need to try a federated join. + var roomID string + if domain != r.Cfg.Matrix.ServerName { + // The alias isn't owned by us, so we will need to try joining using + // a remote server. + dirReq := fsAPI.PerformDirectoryLookupRequest{ + RoomAlias: req.RoomIDOrAlias, // the room alias to lookup + ServerName: domain, // the server to ask + } + dirRes := fsAPI.PerformDirectoryLookupResponse{} + err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes) + if err != nil { + logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias) + return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err) + } + roomID = dirRes.RoomID + req.ServerNames = append(req.ServerNames, dirRes.ServerNames...) + } else { + // Otherwise, look up if we know this room alias locally. + roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias) + if err != nil { + return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err) + } + } + + // If the room ID is empty then we failed to look up the alias. + if roomID == "" { + return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias) + } + + // If we do, then pluck out the room ID and continue the join. + req.RoomIDOrAlias = roomID + return r.performJoinRoomByID(ctx, req) +} + +// TODO: Break this function up a bit +// nolint:gocyclo +func (r *Joiner) performJoinRoomByID( + ctx context.Context, + req *api.PerformJoinRequest, +) (string, error) { + // Get the domain part of the room ID. + _, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias) + if err != nil { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Room ID %q is invalid: %s", req.RoomIDOrAlias, err), + } + } + + // If the server name in the room ID isn't ours then it's a + // possible candidate for finding the room via federation. Add + // it to the list of servers to try. + if domain != r.Cfg.Matrix.ServerName { + req.ServerNames = append(req.ServerNames, domain) + } + + // Prepare the template for the join event. + userID := req.UserID + eb := gomatrixserverlib.EventBuilder{ + Type: gomatrixserverlib.MRoomMember, + Sender: userID, + StateKey: &userID, + RoomID: req.RoomIDOrAlias, + Redacts: "", + } + if err = eb.SetUnsigned(struct{}{}); err != nil { + return "", fmt.Errorf("eb.SetUnsigned: %w", err) + } + + // It is possible for the request to include some "content" for the + // event. We'll always overwrite the "membership" key, but the rest, + // like "display_name" or "avatar_url", will be kept if supplied. + if req.Content == nil { + req.Content = map[string]interface{}{} + } + req.Content["membership"] = gomatrixserverlib.Join + if err = eb.SetContent(req.Content); err != nil { + return "", fmt.Errorf("eb.SetContent: %w", err) + } + + // First work out if this is in response to an existing invite + // from a federated server. If it is then we avoid the situation + // where we might think we know about a room in the following + // section but don't know the latest state as all of our users + // have left. + serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias) + isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID) + if err == nil && isInvitePending && !serverInRoom { + // Check if there's an invite pending. + _, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender) + if ierr != nil { + return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + + // Check that the domain isn't ours. If it's local then we don't + // need to do anything as our own copy of the room state will be + // up-to-date. + if inviterDomain != r.Cfg.Matrix.ServerName { + // Add the server of the person who invited us to the server list, + // as they should be a fairly good bet. + req.ServerNames = append(req.ServerNames, inviterDomain) + + // Perform a federated room join. + return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req) + } + } + + // Try to construct an actual join event from the template. + // If this succeeds then it is a sign that the room already exists + // locally on the homeserver. + // TODO: Check what happens if the room exists on the server + // but everyone has since left. I suspect it does the wrong thing. + buildRes := api.QueryLatestEventsAndStateResponse{} + event, err := eventutil.BuildEvent( + ctx, // the request context + &eb, // the template join event + r.Cfg.Matrix, // the server configuration + time.Now(), // the event timestamp to use + r.RSAPI, // the roomserver API to use + &buildRes, // the query response + ) + + switch err { + case nil: + // The room join is local. Send the new join event into the + // roomserver. First of all check that the user isn't already + // a member of the room. + alreadyJoined := false + for _, se := range buildRes.StateEvents { + if !se.StateKeyEquals(userID) { + continue + } + if membership, merr := se.Membership(); merr == nil { + alreadyJoined = (membership == gomatrixserverlib.Join) + break + } + } + + // If we haven't already joined the room then send an event + // into the room changing our membership status. + if !alreadyJoined { + inputReq := api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event.Headered(buildRes.RoomVersion), + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: string(r.Cfg.Matrix.ServerName), + }, + }, + } + inputRes := api.InputRoomEventsResponse{} + if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { + var notAllowed *gomatrixserverlib.NotAllowed + if errors.As(err, ¬Allowed) { + return "", &api.PerformError{ + Code: api.PerformErrorNotAllowed, + Msg: fmt.Sprintf("InputRoomEvents auth failed: %s", err), + } + } + return "", fmt.Errorf("r.InputRoomEvents: %w", err) + } + } + + case eventutil.ErrRoomNoExists: + // The room doesn't exist locally. If the room ID looks like it should + // be ours then this probably means that we've nuked our database at + // some point. + if domain == r.Cfg.Matrix.ServerName { + // If there are no more server names to try then give up here. + // Otherwise we'll try a federated join as normal, since it's quite + // possible that the room still exists on other servers. + if len(req.ServerNames) == 0 { + return "", &api.PerformError{ + Code: api.PerformErrorNoRoom, + Msg: fmt.Sprintf("Room ID %q does not exist", req.RoomIDOrAlias), + } + } + } + + // Perform a federated room join. + return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req) + + default: + // Something else went wrong. + return "", fmt.Errorf("Error joining local room: %q", err) + } + + // By this point, if req.RoomIDOrAlias contained an alias, then + // it will have been overwritten with a room ID by performJoinRoomByAlias. + // We should now include this in the response so that the CS API can + // return the right room ID. + return req.RoomIDOrAlias, nil +} + +func (r *Joiner) performFederatedJoinRoomByID( + ctx context.Context, + req *api.PerformJoinRequest, +) error { + // Try joining by all of the supplied server names. + fedReq := fsAPI.PerformJoinRequest{ + RoomID: req.RoomIDOrAlias, // the room ID to try and join + UserID: req.UserID, // the user ID joining the room + ServerNames: req.ServerNames, // the server to try joining with + Content: req.Content, // the membership event content + } + fedRes := fsAPI.PerformJoinResponse{} + r.FSAPI.PerformJoin(ctx, &fedReq, &fedRes) + if fedRes.LastError != nil { + return &api.PerformError{ + Code: api.PerformErrRemote, + Msg: fedRes.LastError.Message, + RemoteCode: fedRes.LastError.Code, + } + } + return nil +} diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go new file mode 100644 index 00000000..b4053eed --- /dev/null +++ b/roomserver/internal/perform/perform_leave.go @@ -0,0 +1,179 @@ +package perform + +import ( + "context" + "fmt" + "strings" + "time" + + fsAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/gomatrixserverlib" +) + +type Leaver struct { + Cfg *config.RoomServer + DB storage.Database + FSAPI fsAPI.FederationSenderInternalAPI + + // TODO FIXME: Remove this + RSAPI api.RoomserverInternalAPI +} + +// WriteOutputEvents implements OutputRoomEventWriter +func (r *Leaver) PerformLeave( + ctx context.Context, + req *api.PerformLeaveRequest, + res *api.PerformLeaveResponse, +) ([]api.OutputEvent, error) { + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return nil, fmt.Errorf("Supplied user ID %q in incorrect format", req.UserID) + } + if domain != r.Cfg.Matrix.ServerName { + return nil, fmt.Errorf("User %q does not belong to this homeserver", req.UserID) + } + if strings.HasPrefix(req.RoomID, "!") { + return r.performLeaveRoomByID(ctx, req, res) + } + return nil, fmt.Errorf("Room ID %q is invalid", req.RoomID) +} + +func (r *Leaver) performLeaveRoomByID( + ctx context.Context, + req *api.PerformLeaveRequest, + res *api.PerformLeaveResponse, // nolint:unparam +) ([]api.OutputEvent, error) { + // If there's an invite outstanding for the room then respond to + // that. + isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID) + if err == nil && isInvitePending { + return r.performRejectInvite(ctx, req, res, senderUser, eventID) + } + + // There's no invite pending, so first of all we want to find out + // if the room exists and if the user is actually in it. + latestReq := api.QueryLatestEventsAndStateRequest{ + RoomID: req.RoomID, + StateToFetch: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: req.UserID, + }, + }, + } + latestRes := api.QueryLatestEventsAndStateResponse{} + if err = r.RSAPI.QueryLatestEventsAndState(ctx, &latestReq, &latestRes); err != nil { + return nil, err + } + if !latestRes.RoomExists { + return nil, fmt.Errorf("Room %q does not exist", req.RoomID) + } + + // Now let's see if the user is in the room. + if len(latestRes.StateEvents) == 0 { + return nil, fmt.Errorf("User %q is not a member of room %q", req.UserID, req.RoomID) + } + membership, err := latestRes.StateEvents[0].Membership() + if err != nil { + return nil, fmt.Errorf("Error getting membership: %w", err) + } + if membership != gomatrixserverlib.Join { + // TODO: should be able to handle "invite" in this case too, if + // it's a case of kicking or banning or such + return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership) + } + + // Prepare the template for the leave event. + userID := req.UserID + eb := gomatrixserverlib.EventBuilder{ + Type: gomatrixserverlib.MRoomMember, + Sender: userID, + StateKey: &userID, + RoomID: req.RoomID, + Redacts: "", + } + if err = eb.SetContent(map[string]interface{}{"membership": "leave"}); err != nil { + return nil, fmt.Errorf("eb.SetContent: %w", err) + } + if err = eb.SetUnsigned(struct{}{}); err != nil { + return nil, fmt.Errorf("eb.SetUnsigned: %w", err) + } + + // We know that the user is in the room at this point so let's build + // a leave event. + // TODO: Check what happens if the room exists on the server + // but everyone has since left. I suspect it does the wrong thing. + buildRes := api.QueryLatestEventsAndStateResponse{} + event, err := eventutil.BuildEvent( + ctx, // the request context + &eb, // the template leave event + r.Cfg.Matrix, // the server configuration + time.Now(), // the event timestamp to use + r.RSAPI, // the roomserver API to use + &buildRes, // the query response + ) + if err != nil { + return nil, fmt.Errorf("eventutil.BuildEvent: %w", err) + } + + // Give our leave event to the roomserver input stream. The + // roomserver will process the membership change and notify + // downstream automatically. + inputReq := api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event.Headered(buildRes.RoomVersion), + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: string(r.Cfg.Matrix.ServerName), + }, + }, + } + inputRes := api.InputRoomEventsResponse{} + if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { + return nil, fmt.Errorf("r.InputRoomEvents: %w", err) + } + + return nil, nil +} + +func (r *Leaver) performRejectInvite( + ctx context.Context, + req *api.PerformLeaveRequest, + res *api.PerformLeaveResponse, // nolint:unparam + senderUser, eventID string, +) ([]api.OutputEvent, error) { + _, domain, err := gomatrixserverlib.SplitID('@', senderUser) + if err != nil { + return nil, fmt.Errorf("User ID %q invalid: %w", senderUser, err) + } + + // Ask the federation sender to perform a federated leave for us. + leaveReq := fsAPI.PerformLeaveRequest{ + RoomID: req.RoomID, + UserID: req.UserID, + ServerNames: []gomatrixserverlib.ServerName{domain}, + } + leaveRes := fsAPI.PerformLeaveResponse{} + if err := r.FSAPI.PerformLeave(ctx, &leaveReq, &leaveRes); err != nil { + return nil, err + } + + // Withdraw the invite, so that the sync API etc are + // notified that we rejected it. + return []api.OutputEvent{ + { + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &api.OutputRetireInviteEvent{ + EventID: eventID, + Membership: "leave", + TargetUserID: req.UserID, + }, + }, + }, nil +} diff --git a/roomserver/internal/perform/perform_publish.go b/roomserver/internal/perform/perform_publish.go new file mode 100644 index 00000000..aab282f3 --- /dev/null +++ b/roomserver/internal/perform/perform_publish.go @@ -0,0 +1,25 @@ +package perform + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage" +) + +type Publisher struct { + DB storage.Database +} + +func (r *Publisher) PerformPublish( + ctx context.Context, + req *api.PerformPublishRequest, + res *api.PerformPublishResponse, +) { + err := r.DB.PublishRoom(ctx, req.RoomID, req.Visibility == "public") + if err != nil { + res.Error = &api.PerformError{ + Msg: err.Error(), + } + } +} |