aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/perform
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal/perform')
-rw-r--r--roomserver/internal/perform/perform_backfill.go547
-rw-r--r--roomserver/internal/perform/perform_invite.go243
-rw-r--r--roomserver/internal/perform/perform_join.go308
-rw-r--r--roomserver/internal/perform/perform_leave.go179
-rw-r--r--roomserver/internal/perform/perform_publish.go25
5 files changed, 1302 insertions, 0 deletions
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
new file mode 100644
index 00000000..ebb66ef4
--- /dev/null
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -0,0 +1,547 @@
+package perform
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/auth"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+)
+
+type Backfiller struct {
+ ServerName gomatrixserverlib.ServerName
+ DB storage.Database
+ FedClient *gomatrixserverlib.FederationClient
+ KeyRing gomatrixserverlib.JSONVerifier
+}
+
+// PerformBackfill implements api.RoomServerQueryAPI
+func (r *Backfiller) PerformBackfill(
+ ctx context.Context,
+ request *api.PerformBackfillRequest,
+ response *api.PerformBackfillResponse,
+) error {
+ // if we are requesting the backfill then we need to do a federation hit
+ // TODO: we could be more sensible and fetch as many events we already have then request the rest
+ // which is what the syncapi does already.
+ if request.ServerName == r.ServerName {
+ return r.backfillViaFederation(ctx, request, response)
+ }
+ // someone else is requesting the backfill, try to service their request.
+ var err error
+ var front []string
+
+ // The limit defines the maximum number of events to retrieve, so it also
+ // defines the highest number of elements in the map below.
+ visited := make(map[string]bool, request.Limit)
+
+ // this will include these events which is what we want
+ front = request.PrevEventIDs()
+
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return fmt.Errorf("PerformBackfill: missing room info for room %s", request.RoomID)
+ }
+
+ // Scan the event tree for events to send back.
+ resultNIDs, err := helpers.ScanEventTree(ctx, r.DB, *info, front, visited, request.Limit, request.ServerName)
+ if err != nil {
+ return err
+ }
+
+ // Retrieve events from the list that was filled previously.
+ var loadedEvents []gomatrixserverlib.Event
+ loadedEvents, err = helpers.LoadEvents(ctx, r.DB, resultNIDs)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range loadedEvents {
+ response.Events = append(response.Events, event.Headered(info.RoomVersion))
+ }
+
+ return err
+}
+
+func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error {
+ info, err := r.DB.RoomInfo(ctx, req.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
+ }
+ requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities)
+ // Request 100 items regardless of what the query asks for.
+ // We don't want to go much higher than this.
+ // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
+ // (so we don't need to hit /state_ids which the test has no listener for)
+ // Specifically the test "Outbound federation can backfill events"
+ events, err := gomatrixserverlib.RequestBackfill(
+ ctx, requester,
+ r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100)
+ if err != nil {
+ return err
+ }
+ logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
+
+ // persist these new events - auth checks have already been done
+ roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
+ if err != nil {
+ return err
+ }
+
+ for _, ev := range backfilledEventMap {
+ // now add state for these events
+ stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
+ if !ok {
+ // this should be impossible as all events returned must have pass Step 5 of the PDU checks
+ // which requires a list of state IDs.
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to find state IDs for event which passed auth checks")
+ continue
+ }
+ var entries []types.StateEntry
+ if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
+ // attempt to fetch the missing events
+ r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs)
+ // try again
+ entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs)
+ if err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
+ return err
+ }
+ }
+
+ var beforeStateSnapshotNID types.StateSnapshotNID
+ if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid")
+ return err
+ }
+ if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist snapshot nid")
+ }
+ }
+
+ // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
+
+ res.Events = events
+ return nil
+}
+
+// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just
+// best effort.
+func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion,
+ backfillRequester *backfillRequester, stateIDs []string) {
+
+ servers := backfillRequester.servers
+
+ // work out which are missing
+ nidMap, err := r.DB.EventNIDs(ctx, stateIDs)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Warn("cannot query missing events")
+ return
+ }
+ missingMap := make(map[string]*gomatrixserverlib.HeaderedEvent) // id -> event
+ for _, id := range stateIDs {
+ if _, ok := nidMap[id]; !ok {
+ missingMap[id] = nil
+ }
+ }
+ util.GetLogger(ctx).Infof("Fetching %d missing state events (from %d possible servers)", len(missingMap), len(servers))
+
+ // fetch the events from federation. Loop the servers first so if we find one that works we stick with them
+ for _, srv := range servers {
+ for id, ev := range missingMap {
+ if ev != nil {
+ continue // already found
+ }
+ logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id)
+ res, err := r.FedClient.GetEvent(ctx, srv, id)
+ if err != nil {
+ logger.WithError(err).Warn("failed to get event from server")
+ continue
+ }
+ loader := gomatrixserverlib.NewEventsLoader(roomVer, r.KeyRing, backfillRequester, backfillRequester.ProvideEvents, false)
+ result, err := loader.LoadAndVerify(ctx, res.PDUs, gomatrixserverlib.TopologicalOrderByPrevEvents)
+ if err != nil {
+ logger.WithError(err).Warn("failed to load and verify event")
+ continue
+ }
+ logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
+ for _, res := range result {
+ if res.Error != nil {
+ logger.WithError(err).Warn("event failed PDU checks")
+ continue
+ }
+ missingMap[id] = res.Event
+ }
+ }
+ }
+
+ var newEvents []gomatrixserverlib.HeaderedEvent
+ for _, ev := range missingMap {
+ if ev != nil {
+ newEvents = append(newEvents, *ev)
+ }
+ }
+ util.GetLogger(ctx).Infof("Persisting %d new events", len(newEvents))
+ persistEvents(ctx, r.DB, newEvents)
+}
+
+// backfillRequester implements gomatrixserverlib.BackfillRequester
+type backfillRequester struct {
+ db storage.Database
+ fedClient *gomatrixserverlib.FederationClient
+ thisServer gomatrixserverlib.ServerName
+ bwExtrems map[string][]string
+
+ // per-request state
+ servers []gomatrixserverlib.ServerName
+ eventIDToBeforeStateIDs map[string][]string
+ eventIDMap map[string]gomatrixserverlib.Event
+}
+
+func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester {
+ return &backfillRequester{
+ db: db,
+ fedClient: fedClient,
+ thisServer: thisServer,
+ eventIDToBeforeStateIDs: make(map[string][]string),
+ eventIDMap: make(map[string]gomatrixserverlib.Event),
+ bwExtrems: bwExtrems,
+ }
+}
+
+func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) {
+ b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap()
+ if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
+ return ids, nil
+ }
+ if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") {
+ util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room")
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{}
+ return nil, nil
+ }
+ // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
+ // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
+ // we don't know the result of state res to merge forks (2 or more prev_events)
+ if len(targetEvent.PrevEventIDs()) == 1 {
+ prevEventID := targetEvent.PrevEventIDs()[0]
+ prevEvent, ok := b.eventIDMap[prevEventID]
+ if !ok {
+ goto FederationHit
+ }
+ prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID]
+ if !ok {
+ goto FederationHit
+ }
+ newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs)
+ if newStateIDs != nil {
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs, nil
+ }
+ // else we failed to calculate the new state, so fallthrough
+ }
+
+FederationHit:
+ var lastErr error
+ logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
+ for _, srv := range b.servers { // hit any valid server
+ c := gomatrixserverlib.FederatedStateProvider{
+ FedClient: b.fedClient,
+ RememberAuthEvents: false,
+ Server: srv,
+ }
+ res, err := c.StateIDsBeforeEvent(ctx, targetEvent)
+ if err != nil {
+ lastErr = err
+ continue
+ }
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
+ return res, nil
+ }
+ return nil, lastErr
+}
+
+func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string {
+ newStateIDs := prevEventStateIDs[:]
+ if prevEvent.StateKey() == nil {
+ // state is the same as the previous event
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs
+ }
+
+ missingState := false // true if we are missing the info for a state event ID
+ foundEvent := false // true if we found a (type, state_key) match
+ // find which state ID to replace, if any
+ for i, id := range newStateIDs {
+ ev, ok := b.eventIDMap[id]
+ if !ok {
+ missingState = true
+ continue
+ }
+ // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself
+ if ev.Type() == prevEvent.Type() && ev.StateKeyEquals(*prevEvent.StateKey()) {
+ newStateIDs[i] = prevEvent.EventID()
+ foundEvent = true
+ break
+ }
+ }
+ if !foundEvent && !missingState {
+ // we can be certain that this is new state
+ newStateIDs = append(newStateIDs, prevEvent.EventID())
+ foundEvent = true
+ }
+
+ if foundEvent {
+ b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
+ return newStateIDs
+ }
+ return nil
+}
+
+func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion,
+ event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) {
+
+ // try to fetch the events from the database first
+ events, err := b.ProvideEvents(roomVer, eventIDs)
+ if err != nil {
+ // non-fatal, fallthrough
+ logrus.WithError(err).Info("Failed to fetch events")
+ } else {
+ logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs))
+ if len(events) == len(eventIDs) {
+ result := make(map[string]*gomatrixserverlib.Event)
+ for i := range events {
+ result[events[i].EventID()] = &events[i]
+ b.eventIDMap[events[i].EventID()] = events[i]
+ }
+ return result, nil
+ }
+ }
+
+ c := gomatrixserverlib.FederatedStateProvider{
+ FedClient: b.fedClient,
+ RememberAuthEvents: false,
+ Server: b.servers[0],
+ }
+ result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
+ if err != nil {
+ return nil, err
+ }
+ for eventID, ev := range result {
+ b.eventIDMap[eventID] = *ev
+ }
+ return result, nil
+}
+
+// ServersAtEvent is called when trying to determine which server to request from.
+// It returns a list of servers which can be queried for backfill requests. These servers
+// will be servers that are in the room already. The entries at the beginning are preferred servers
+// and will be tried first. An empty list will fail the request.
+// nolint:gocyclo
+func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName {
+ // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use
+ // its successor, so look it up.
+ successor := ""
+FindSuccessor:
+ for sucID, prevEventIDs := range b.bwExtrems {
+ for _, pe := range prevEventIDs {
+ if pe == eventID {
+ successor = sucID
+ break FindSuccessor
+ }
+ }
+ }
+ if successor == "" {
+ logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state")
+ return nil
+ }
+ eventID = successor
+
+ // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
+ // the event is necessary.
+ NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
+ if err != nil {
+ logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
+ return nil
+ }
+
+ info, err := b.db.RoomInfo(ctx, roomID)
+ if err != nil {
+ logrus.WithError(err).WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room")
+ return nil
+ }
+ if info == nil || info.IsStub {
+ logrus.WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room, room is missing")
+ return nil
+ }
+
+ stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, *info, NIDs[eventID])
+ if err != nil {
+ logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event")
+ return nil
+ }
+
+ // possibly return all joined servers depending on history visiblity
+ memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries)
+ if err != nil {
+ logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
+ return nil
+ }
+ logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis))
+
+ // Retrieve all "m.room.member" state events of "join" membership, which
+ // contains the list of users in the room before the event, therefore all
+ // the servers in it at that moment.
+ memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, stateEntries, true)
+ if err != nil {
+ logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
+ return nil
+ }
+ memberEvents = append(memberEvents, memberEventsFromVis...)
+
+ // Store the server names in a temporary map to avoid duplicates.
+ serverSet := make(map[gomatrixserverlib.ServerName]bool)
+ for _, event := range memberEvents {
+ serverSet[event.Origin()] = true
+ }
+ var servers []gomatrixserverlib.ServerName
+ for server := range serverSet {
+ if server == b.thisServer {
+ continue
+ }
+ servers = append(servers, server)
+ }
+ b.servers = servers
+ return servers
+}
+
+// Backfill performs a backfill request to the given server.
+// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
+func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string,
+ fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
+
+ tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
+ return &tx, err
+}
+
+func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+ ctx := context.Background()
+ nidMap, err := b.db.EventNIDs(ctx, eventIDs)
+ if err != nil {
+ logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events")
+ return nil, err
+ }
+ eventNIDs := make([]types.EventNID, len(nidMap))
+ i := 0
+ for _, nid := range nidMap {
+ eventNIDs[i] = nid
+ i++
+ }
+ eventsWithNids, err := b.db.Events(ctx, eventNIDs)
+ if err != nil {
+ logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
+ return nil, err
+ }
+ events := make([]gomatrixserverlib.Event, len(eventsWithNids))
+ for i := range eventsWithNids {
+ events[i] = eventsWithNids[i].Event
+ }
+ return events, nil
+}
+
+// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if the provided state indicated a 'shared' history visibility.
+// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
+// pull all events and then filter by that table.
+func joinEventsFromHistoryVisibility(
+ ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry) ([]types.Event, error) {
+
+ var eventNIDs []types.EventNID
+ for _, entry := range stateEntries {
+ // Filter the events to retrieve to only keep the membership events
+ if entry.EventTypeNID == types.MRoomHistoryVisibilityNID && entry.EventStateKeyNID == types.EmptyStateKeyNID {
+ eventNIDs = append(eventNIDs, entry.EventNID)
+ break
+ }
+ }
+
+ // Get all of the events in this state
+ stateEvents, err := db.Events(ctx, eventNIDs)
+ if err != nil {
+ return nil, err
+ }
+ events := make([]gomatrixserverlib.Event, len(stateEvents))
+ for i := range stateEvents {
+ events[i] = stateEvents[i].Event
+ }
+ visibility := auth.HistoryVisibilityForRoom(events)
+ if visibility != "shared" {
+ logrus.Infof("ServersAtEvent history visibility not shared: %s", visibility)
+ return nil, nil
+ }
+ // get joined members
+ info, err := db.RoomInfo(ctx, roomID)
+ if err != nil {
+ return nil, err
+ }
+ joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
+ if err != nil {
+ return nil, err
+ }
+ return db.Events(ctx, joinEventNIDs)
+}
+
+func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
+ var roomNID types.RoomNID
+ backfilledEventMap := make(map[string]types.Event)
+ for j, ev := range events {
+ nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
+ if err != nil { // this shouldn't happen as RequestBackfill already found them
+ logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
+ continue
+ }
+ authNids := make([]types.EventNID, len(nidMap))
+ i := 0
+ for _, nid := range nidMap {
+ authNids[i] = nid
+ i++
+ }
+ var stateAtEvent types.StateAtEvent
+ var redactedEventID string
+ var redactionEvent *gomatrixserverlib.Event
+ roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
+ if err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
+ continue
+ }
+ // If storing this event results in it being redacted, then do so.
+ // It's also possible for this event to be a redaction which results in another event being
+ // redacted, which we don't care about since we aren't returning it in this backfill.
+ if redactedEventID == ev.EventID() {
+ eventToRedact := ev.Unwrap()
+ redactedEvent, err := eventutil.RedactEvent(redactionEvent, &eventToRedact)
+ if err != nil {
+ logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event")
+ continue
+ }
+ ev = redactedEvent.Headered(ev.RoomVersion)
+ events[j] = ev
+ }
+ backfilledEventMap[ev.EventID()] = types.Event{
+ EventNID: stateAtEvent.StateEntry.EventNID,
+ Event: ev.Unwrap(),
+ }
+ }
+ return roomNID, backfilledEventMap
+}
diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go
new file mode 100644
index 00000000..7320388e
--- /dev/null
+++ b/roomserver/internal/perform/perform_invite.go
@@ -0,0 +1,243 @@
+package perform
+
+import (
+ "context"
+ "fmt"
+
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+type Inviter struct {
+ DB storage.Database
+ Cfg *config.RoomServer
+ FSAPI federationSenderAPI.FederationSenderInternalAPI
+
+ // TODO FIXME: Remove this
+ RSAPI api.RoomserverInternalAPI
+}
+
+// nolint:gocyclo
+func (r *Inviter) PerformInvite(
+ ctx context.Context,
+ req *api.PerformInviteRequest,
+ res *api.PerformInviteResponse,
+) ([]api.OutputEvent, error) {
+ event := req.Event
+ if event.StateKey() == nil {
+ return nil, fmt.Errorf("invite must be a state event")
+ }
+
+ roomID := event.RoomID()
+ targetUserID := *event.StateKey()
+ info, err := r.DB.RoomInfo(ctx, roomID)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to load RoomInfo: %w", err)
+ }
+
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "room_id": roomID,
+ "room_version": req.RoomVersion,
+ "target_user_id": targetUserID,
+ "room_info_exists": info != nil,
+ }).Info("processing invite event")
+
+ _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
+ isTargetLocal := domain == r.Cfg.Matrix.ServerName
+ isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
+
+ inviteState := req.InviteRoomState
+ if len(inviteState) == 0 && info != nil {
+ var is []gomatrixserverlib.InviteV2StrippedState
+ if is, err = buildInviteStrippedState(ctx, r.DB, info, req); err == nil {
+ inviteState = is
+ }
+ }
+ if len(inviteState) == 0 {
+ if err = event.SetUnsignedField("invite_room_state", struct{}{}); err != nil {
+ return nil, fmt.Errorf("event.SetUnsignedField: %w", err)
+ }
+ } else {
+ if err = event.SetUnsignedField("invite_room_state", inviteState); err != nil {
+ return nil, fmt.Errorf("event.SetUnsignedField: %w", err)
+ }
+ }
+
+ var isAlreadyJoined bool
+ if info != nil {
+ _, isAlreadyJoined, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
+ if err != nil {
+ return nil, fmt.Errorf("r.DB.GetMembership: %w", err)
+ }
+ }
+ if isAlreadyJoined {
+ // If the user is joined to the room then that takes precedence over this
+ // invite event. It makes little sense to move a user that is already
+ // joined to the room into the invite state.
+ // This could plausibly happen if an invite request raced with a join
+ // request for a user. For example if a user was invited to a public
+ // room and they joined the room at the same time as the invite was sent.
+ // The other way this could plausibly happen is if an invite raced with
+ // a kick. For example if a user was kicked from a room in error and in
+ // response someone else in the room re-invited them then it is possible
+ // for the invite request to race with the leave event so that the
+ // target receives invite before it learns that it has been kicked.
+ // There are a few ways this could be plausibly handled in the roomserver.
+ // 1) Store the invite, but mark it as retired. That will result in the
+ // permanent rejection of that invite event. So even if the target
+ // user leaves the room and the invite is retransmitted it will be
+ // ignored. However a new invite with a new event ID would still be
+ // accepted.
+ // 2) Silently discard the invite event. This means that if the event
+ // was retransmitted at a later date after the target user had left
+ // the room we would accept the invite. However since we hadn't told
+ // the sending server that the invite had been discarded it would
+ // have no reason to attempt to retry.
+ // 3) Signal the sending server that the user is already joined to the
+ // room.
+ // For now we will implement option 2. Since in the abesence of a retry
+ // mechanism it will be equivalent to option 1, and we don't have a
+ // signalling mechanism to implement option 3.
+ res.Error = &api.PerformError{
+ Code: api.PerformErrorNotAllowed,
+ Msg: "User is already joined to room",
+ }
+ return nil, nil
+ }
+
+ if isOriginLocal {
+ // The invite originated locally. Therefore we have a responsibility to
+ // try and see if the user is allowed to make this invite. We can't do
+ // this for invites coming in over federation - we have to take those on
+ // trust.
+ _, err = helpers.CheckAuthEvents(ctx, r.DB, event, event.AuthEventIDs())
+ if err != nil {
+ log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
+ "processInviteEvent.checkAuthEvents failed for event",
+ )
+ if _, ok := err.(*gomatrixserverlib.NotAllowed); ok {
+ res.Error = &api.PerformError{
+ Msg: err.Error(),
+ Code: api.PerformErrorNotAllowed,
+ }
+ return nil, nil
+ }
+ return nil, fmt.Errorf("checkAuthEvents: %w", err)
+ }
+
+ // If the invite originated from us and the target isn't local then we
+ // should try and send the invite over federation first. It might be
+ // that the remote user doesn't exist, in which case we can give up
+ // processing here.
+ if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal {
+ fsReq := &federationSenderAPI.PerformInviteRequest{
+ RoomVersion: req.RoomVersion,
+ Event: event,
+ InviteRoomState: inviteState,
+ }
+ fsRes := &federationSenderAPI.PerformInviteResponse{}
+ if err = r.FSAPI.PerformInvite(ctx, fsReq, fsRes); err != nil {
+ res.Error = &api.PerformError{
+ Msg: err.Error(),
+ Code: api.PerformErrorNoOperation,
+ }
+ log.WithError(err).WithField("event_id", event.EventID()).Error("r.FSAPI.PerformInvite failed")
+ return nil, nil
+ }
+ event = fsRes.Event
+ }
+
+ // Send the invite event to the roomserver input stream. This will
+ // notify existing users in the room about the invite, update the
+ // membership table and ensure that the event is ready and available
+ // to use as an auth event when accepting the invite.
+ inputReq := &api.InputRoomEventsRequest{
+ InputRoomEvents: []api.InputRoomEvent{
+ {
+ Kind: api.KindNew,
+ Event: event,
+ AuthEventIDs: event.AuthEventIDs(),
+ SendAsServer: req.SendAsServer,
+ },
+ },
+ }
+ inputRes := &api.InputRoomEventsResponse{}
+ if err = r.RSAPI.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
+ return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
+ }
+ } else {
+ // The invite originated over federation. Process the membership
+ // update, which will notify the sync API etc about the incoming
+ // invite.
+ updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
+ if err != nil {
+ return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
+ }
+
+ unwrapped := event.Unwrap()
+ outputUpdates, err := helpers.UpdateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion)
+ if err != nil {
+ return nil, fmt.Errorf("updateToInviteMembership: %w", err)
+ }
+
+ if err = updater.Commit(); err != nil {
+ return nil, fmt.Errorf("updater.Commit: %w", err)
+ }
+
+ return outputUpdates, nil
+ }
+
+ return nil, nil
+}
+
+func buildInviteStrippedState(
+ ctx context.Context,
+ db storage.Database,
+ info *types.RoomInfo,
+ input *api.PerformInviteRequest,
+) ([]gomatrixserverlib.InviteV2StrippedState, error) {
+ stateWanted := []gomatrixserverlib.StateKeyTuple{}
+ // "If they are set on the room, at least the state for m.room.avatar, m.room.canonical_alias, m.room.join_rules, and m.room.name SHOULD be included."
+ // https://matrix.org/docs/spec/client_server/r0.6.0#m-room-member
+ for _, t := range []string{
+ gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
+ gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
+ "m.room.avatar", "m.room.encryption",
+ } {
+ stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
+ EventType: t,
+ StateKey: "",
+ })
+ }
+ roomState := state.NewStateResolution(db, *info)
+ stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
+ ctx, info.StateSnapshotNID, stateWanted,
+ )
+ if err != nil {
+ return nil, err
+ }
+ stateNIDs := []types.EventNID{}
+ for _, stateNID := range stateEntries {
+ stateNIDs = append(stateNIDs, stateNID.EventNID)
+ }
+ stateEvents, err := db.Events(ctx, stateNIDs)
+ if err != nil {
+ return nil, err
+ }
+ inviteState := []gomatrixserverlib.InviteV2StrippedState{
+ gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event),
+ }
+ stateEvents = append(stateEvents, types.Event{Event: input.Event.Unwrap()})
+ for _, event := range stateEvents {
+ inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event))
+ }
+ return inviteState, nil
+}
diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go
new file mode 100644
index 00000000..c8e6e8e6
--- /dev/null
+++ b/roomserver/internal/perform/perform_join.go
@@ -0,0 +1,308 @@
+package perform
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "time"
+
+ fsAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+)
+
+type Joiner struct {
+ ServerName gomatrixserverlib.ServerName
+ Cfg *config.RoomServer
+ FSAPI fsAPI.FederationSenderInternalAPI
+ DB storage.Database
+
+ // TODO FIXME: Remove this
+ RSAPI api.RoomserverInternalAPI
+}
+
+// PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender.
+func (r *Joiner) PerformJoin(
+ ctx context.Context,
+ req *api.PerformJoinRequest,
+ res *api.PerformJoinResponse,
+) {
+ roomID, err := r.performJoin(ctx, req)
+ if err != nil {
+ perr, ok := err.(*api.PerformError)
+ if ok {
+ res.Error = perr
+ } else {
+ res.Error = &api.PerformError{
+ Msg: err.Error(),
+ }
+ }
+ }
+ res.RoomID = roomID
+}
+
+func (r *Joiner) performJoin(
+ ctx context.Context,
+ req *api.PerformJoinRequest,
+) (string, error) {
+ _, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
+ if err != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID),
+ }
+ }
+ if domain != r.Cfg.Matrix.ServerName {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID),
+ }
+ }
+ if strings.HasPrefix(req.RoomIDOrAlias, "!") {
+ return r.performJoinRoomByID(ctx, req)
+ }
+ if strings.HasPrefix(req.RoomIDOrAlias, "#") {
+ return r.performJoinRoomByAlias(ctx, req)
+ }
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias),
+ }
+}
+
+func (r *Joiner) performJoinRoomByAlias(
+ ctx context.Context,
+ req *api.PerformJoinRequest,
+) (string, error) {
+ // Get the domain part of the room alias.
+ _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias)
+ if err != nil {
+ return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias)
+ }
+ req.ServerNames = append(req.ServerNames, domain)
+
+ // Check if this alias matches our own server configuration. If it
+ // doesn't then we'll need to try a federated join.
+ var roomID string
+ if domain != r.Cfg.Matrix.ServerName {
+ // The alias isn't owned by us, so we will need to try joining using
+ // a remote server.
+ dirReq := fsAPI.PerformDirectoryLookupRequest{
+ RoomAlias: req.RoomIDOrAlias, // the room alias to lookup
+ ServerName: domain, // the server to ask
+ }
+ dirRes := fsAPI.PerformDirectoryLookupResponse{}
+ err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes)
+ if err != nil {
+ logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias)
+ return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
+ }
+ roomID = dirRes.RoomID
+ req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
+ } else {
+ // Otherwise, look up if we know this room alias locally.
+ roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias)
+ if err != nil {
+ return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
+ }
+ }
+
+ // If the room ID is empty then we failed to look up the alias.
+ if roomID == "" {
+ return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias)
+ }
+
+ // If we do, then pluck out the room ID and continue the join.
+ req.RoomIDOrAlias = roomID
+ return r.performJoinRoomByID(ctx, req)
+}
+
+// TODO: Break this function up a bit
+// nolint:gocyclo
+func (r *Joiner) performJoinRoomByID(
+ ctx context.Context,
+ req *api.PerformJoinRequest,
+) (string, error) {
+ // Get the domain part of the room ID.
+ _, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias)
+ if err != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Room ID %q is invalid: %s", req.RoomIDOrAlias, err),
+ }
+ }
+
+ // If the server name in the room ID isn't ours then it's a
+ // possible candidate for finding the room via federation. Add
+ // it to the list of servers to try.
+ if domain != r.Cfg.Matrix.ServerName {
+ req.ServerNames = append(req.ServerNames, domain)
+ }
+
+ // Prepare the template for the join event.
+ userID := req.UserID
+ eb := gomatrixserverlib.EventBuilder{
+ Type: gomatrixserverlib.MRoomMember,
+ Sender: userID,
+ StateKey: &userID,
+ RoomID: req.RoomIDOrAlias,
+ Redacts: "",
+ }
+ if err = eb.SetUnsigned(struct{}{}); err != nil {
+ return "", fmt.Errorf("eb.SetUnsigned: %w", err)
+ }
+
+ // It is possible for the request to include some "content" for the
+ // event. We'll always overwrite the "membership" key, but the rest,
+ // like "display_name" or "avatar_url", will be kept if supplied.
+ if req.Content == nil {
+ req.Content = map[string]interface{}{}
+ }
+ req.Content["membership"] = gomatrixserverlib.Join
+ if err = eb.SetContent(req.Content); err != nil {
+ return "", fmt.Errorf("eb.SetContent: %w", err)
+ }
+
+ // First work out if this is in response to an existing invite
+ // from a federated server. If it is then we avoid the situation
+ // where we might think we know about a room in the following
+ // section but don't know the latest state as all of our users
+ // have left.
+ serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias)
+ isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
+ if err == nil && isInvitePending && !serverInRoom {
+ // Check if there's an invite pending.
+ _, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
+ if ierr != nil {
+ return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+
+ // Check that the domain isn't ours. If it's local then we don't
+ // need to do anything as our own copy of the room state will be
+ // up-to-date.
+ if inviterDomain != r.Cfg.Matrix.ServerName {
+ // Add the server of the person who invited us to the server list,
+ // as they should be a fairly good bet.
+ req.ServerNames = append(req.ServerNames, inviterDomain)
+
+ // Perform a federated room join.
+ return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
+ }
+ }
+
+ // Try to construct an actual join event from the template.
+ // If this succeeds then it is a sign that the room already exists
+ // locally on the homeserver.
+ // TODO: Check what happens if the room exists on the server
+ // but everyone has since left. I suspect it does the wrong thing.
+ buildRes := api.QueryLatestEventsAndStateResponse{}
+ event, err := eventutil.BuildEvent(
+ ctx, // the request context
+ &eb, // the template join event
+ r.Cfg.Matrix, // the server configuration
+ time.Now(), // the event timestamp to use
+ r.RSAPI, // the roomserver API to use
+ &buildRes, // the query response
+ )
+
+ switch err {
+ case nil:
+ // The room join is local. Send the new join event into the
+ // roomserver. First of all check that the user isn't already
+ // a member of the room.
+ alreadyJoined := false
+ for _, se := range buildRes.StateEvents {
+ if !se.StateKeyEquals(userID) {
+ continue
+ }
+ if membership, merr := se.Membership(); merr == nil {
+ alreadyJoined = (membership == gomatrixserverlib.Join)
+ break
+ }
+ }
+
+ // If we haven't already joined the room then send an event
+ // into the room changing our membership status.
+ if !alreadyJoined {
+ inputReq := api.InputRoomEventsRequest{
+ InputRoomEvents: []api.InputRoomEvent{
+ {
+ Kind: api.KindNew,
+ Event: event.Headered(buildRes.RoomVersion),
+ AuthEventIDs: event.AuthEventIDs(),
+ SendAsServer: string(r.Cfg.Matrix.ServerName),
+ },
+ },
+ }
+ inputRes := api.InputRoomEventsResponse{}
+ if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ var notAllowed *gomatrixserverlib.NotAllowed
+ if errors.As(err, &notAllowed) {
+ return "", &api.PerformError{
+ Code: api.PerformErrorNotAllowed,
+ Msg: fmt.Sprintf("InputRoomEvents auth failed: %s", err),
+ }
+ }
+ return "", fmt.Errorf("r.InputRoomEvents: %w", err)
+ }
+ }
+
+ case eventutil.ErrRoomNoExists:
+ // The room doesn't exist locally. If the room ID looks like it should
+ // be ours then this probably means that we've nuked our database at
+ // some point.
+ if domain == r.Cfg.Matrix.ServerName {
+ // If there are no more server names to try then give up here.
+ // Otherwise we'll try a federated join as normal, since it's quite
+ // possible that the room still exists on other servers.
+ if len(req.ServerNames) == 0 {
+ return "", &api.PerformError{
+ Code: api.PerformErrorNoRoom,
+ Msg: fmt.Sprintf("Room ID %q does not exist", req.RoomIDOrAlias),
+ }
+ }
+ }
+
+ // Perform a federated room join.
+ return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
+
+ default:
+ // Something else went wrong.
+ return "", fmt.Errorf("Error joining local room: %q", err)
+ }
+
+ // By this point, if req.RoomIDOrAlias contained an alias, then
+ // it will have been overwritten with a room ID by performJoinRoomByAlias.
+ // We should now include this in the response so that the CS API can
+ // return the right room ID.
+ return req.RoomIDOrAlias, nil
+}
+
+func (r *Joiner) performFederatedJoinRoomByID(
+ ctx context.Context,
+ req *api.PerformJoinRequest,
+) error {
+ // Try joining by all of the supplied server names.
+ fedReq := fsAPI.PerformJoinRequest{
+ RoomID: req.RoomIDOrAlias, // the room ID to try and join
+ UserID: req.UserID, // the user ID joining the room
+ ServerNames: req.ServerNames, // the server to try joining with
+ Content: req.Content, // the membership event content
+ }
+ fedRes := fsAPI.PerformJoinResponse{}
+ r.FSAPI.PerformJoin(ctx, &fedReq, &fedRes)
+ if fedRes.LastError != nil {
+ return &api.PerformError{
+ Code: api.PerformErrRemote,
+ Msg: fedRes.LastError.Message,
+ RemoteCode: fedRes.LastError.Code,
+ }
+ }
+ return nil
+}
diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go
new file mode 100644
index 00000000..b4053eed
--- /dev/null
+++ b/roomserver/internal/perform/perform_leave.go
@@ -0,0 +1,179 @@
+package perform
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ fsAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type Leaver struct {
+ Cfg *config.RoomServer
+ DB storage.Database
+ FSAPI fsAPI.FederationSenderInternalAPI
+
+ // TODO FIXME: Remove this
+ RSAPI api.RoomserverInternalAPI
+}
+
+// WriteOutputEvents implements OutputRoomEventWriter
+func (r *Leaver) PerformLeave(
+ ctx context.Context,
+ req *api.PerformLeaveRequest,
+ res *api.PerformLeaveResponse,
+) ([]api.OutputEvent, error) {
+ _, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
+ if err != nil {
+ return nil, fmt.Errorf("Supplied user ID %q in incorrect format", req.UserID)
+ }
+ if domain != r.Cfg.Matrix.ServerName {
+ return nil, fmt.Errorf("User %q does not belong to this homeserver", req.UserID)
+ }
+ if strings.HasPrefix(req.RoomID, "!") {
+ return r.performLeaveRoomByID(ctx, req, res)
+ }
+ return nil, fmt.Errorf("Room ID %q is invalid", req.RoomID)
+}
+
+func (r *Leaver) performLeaveRoomByID(
+ ctx context.Context,
+ req *api.PerformLeaveRequest,
+ res *api.PerformLeaveResponse, // nolint:unparam
+) ([]api.OutputEvent, error) {
+ // If there's an invite outstanding for the room then respond to
+ // that.
+ isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
+ if err == nil && isInvitePending {
+ return r.performRejectInvite(ctx, req, res, senderUser, eventID)
+ }
+
+ // There's no invite pending, so first of all we want to find out
+ // if the room exists and if the user is actually in it.
+ latestReq := api.QueryLatestEventsAndStateRequest{
+ RoomID: req.RoomID,
+ StateToFetch: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: req.UserID,
+ },
+ },
+ }
+ latestRes := api.QueryLatestEventsAndStateResponse{}
+ if err = r.RSAPI.QueryLatestEventsAndState(ctx, &latestReq, &latestRes); err != nil {
+ return nil, err
+ }
+ if !latestRes.RoomExists {
+ return nil, fmt.Errorf("Room %q does not exist", req.RoomID)
+ }
+
+ // Now let's see if the user is in the room.
+ if len(latestRes.StateEvents) == 0 {
+ return nil, fmt.Errorf("User %q is not a member of room %q", req.UserID, req.RoomID)
+ }
+ membership, err := latestRes.StateEvents[0].Membership()
+ if err != nil {
+ return nil, fmt.Errorf("Error getting membership: %w", err)
+ }
+ if membership != gomatrixserverlib.Join {
+ // TODO: should be able to handle "invite" in this case too, if
+ // it's a case of kicking or banning or such
+ return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership)
+ }
+
+ // Prepare the template for the leave event.
+ userID := req.UserID
+ eb := gomatrixserverlib.EventBuilder{
+ Type: gomatrixserverlib.MRoomMember,
+ Sender: userID,
+ StateKey: &userID,
+ RoomID: req.RoomID,
+ Redacts: "",
+ }
+ if err = eb.SetContent(map[string]interface{}{"membership": "leave"}); err != nil {
+ return nil, fmt.Errorf("eb.SetContent: %w", err)
+ }
+ if err = eb.SetUnsigned(struct{}{}); err != nil {
+ return nil, fmt.Errorf("eb.SetUnsigned: %w", err)
+ }
+
+ // We know that the user is in the room at this point so let's build
+ // a leave event.
+ // TODO: Check what happens if the room exists on the server
+ // but everyone has since left. I suspect it does the wrong thing.
+ buildRes := api.QueryLatestEventsAndStateResponse{}
+ event, err := eventutil.BuildEvent(
+ ctx, // the request context
+ &eb, // the template leave event
+ r.Cfg.Matrix, // the server configuration
+ time.Now(), // the event timestamp to use
+ r.RSAPI, // the roomserver API to use
+ &buildRes, // the query response
+ )
+ if err != nil {
+ return nil, fmt.Errorf("eventutil.BuildEvent: %w", err)
+ }
+
+ // Give our leave event to the roomserver input stream. The
+ // roomserver will process the membership change and notify
+ // downstream automatically.
+ inputReq := api.InputRoomEventsRequest{
+ InputRoomEvents: []api.InputRoomEvent{
+ {
+ Kind: api.KindNew,
+ Event: event.Headered(buildRes.RoomVersion),
+ AuthEventIDs: event.AuthEventIDs(),
+ SendAsServer: string(r.Cfg.Matrix.ServerName),
+ },
+ },
+ }
+ inputRes := api.InputRoomEventsResponse{}
+ if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
+ }
+
+ return nil, nil
+}
+
+func (r *Leaver) performRejectInvite(
+ ctx context.Context,
+ req *api.PerformLeaveRequest,
+ res *api.PerformLeaveResponse, // nolint:unparam
+ senderUser, eventID string,
+) ([]api.OutputEvent, error) {
+ _, domain, err := gomatrixserverlib.SplitID('@', senderUser)
+ if err != nil {
+ return nil, fmt.Errorf("User ID %q invalid: %w", senderUser, err)
+ }
+
+ // Ask the federation sender to perform a federated leave for us.
+ leaveReq := fsAPI.PerformLeaveRequest{
+ RoomID: req.RoomID,
+ UserID: req.UserID,
+ ServerNames: []gomatrixserverlib.ServerName{domain},
+ }
+ leaveRes := fsAPI.PerformLeaveResponse{}
+ if err := r.FSAPI.PerformLeave(ctx, &leaveReq, &leaveRes); err != nil {
+ return nil, err
+ }
+
+ // Withdraw the invite, so that the sync API etc are
+ // notified that we rejected it.
+ return []api.OutputEvent{
+ {
+ Type: api.OutputTypeRetireInviteEvent,
+ RetireInviteEvent: &api.OutputRetireInviteEvent{
+ EventID: eventID,
+ Membership: "leave",
+ TargetUserID: req.UserID,
+ },
+ },
+ }, nil
+}
diff --git a/roomserver/internal/perform/perform_publish.go b/roomserver/internal/perform/perform_publish.go
new file mode 100644
index 00000000..aab282f3
--- /dev/null
+++ b/roomserver/internal/perform/perform_publish.go
@@ -0,0 +1,25 @@
+package perform
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+)
+
+type Publisher struct {
+ DB storage.Database
+}
+
+func (r *Publisher) PerformPublish(
+ ctx context.Context,
+ req *api.PerformPublishRequest,
+ res *api.PerformPublishResponse,
+) {
+ err := r.DB.PublishRoom(ctx, req.RoomID, req.Visibility == "public")
+ if err != nil {
+ res.Error = &api.PerformError{
+ Msg: err.Error(),
+ }
+ }
+}