diff options
author | kegsay <kegan@matrix.org> | 2022-05-17 13:23:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-17 13:23:35 +0100 |
commit | 6de29c1cd23d218f04d2e570932db8967d6adc4f (patch) | |
tree | b95fa478ef9ecd2c21963868a3626063bdff7cbc /federationapi | |
parent | cd82460513d5abf04e56c01667d56499d4c354be (diff) |
bugfix: E2EE device keys could sometimes not be sent to remote servers (#2466)
* Fix flakey sytest 'Local device key changes get to remote servers'
* Debug logs
* Remove internal/test and use /test only
Remove a lot of ancient code too.
* Use FederationRoomserverAPI in more places
* Use more interfaces in federationapi; begin adding regression test
* Linting
* Add regression test
* Unbreak tests
* ALL THE LOGS
* Fix a race condition which could cause events to not be sent to servers
If a new room event which rewrites state arrives, we remove all joined hosts
then re-calculate them. This wasn't done in a transaction so for a brief period
we would have no joined hosts. During this interim, key change events which arrive
would not be sent to destination servers. This would sporadically fail on sytest.
* Unbreak new tests
* Linting
Diffstat (limited to 'federationapi')
-rw-r--r-- | federationapi/api/api.go | 40 | ||||
-rw-r--r-- | federationapi/consumers/keychange.go | 8 | ||||
-rw-r--r-- | federationapi/consumers/roomserver.go | 29 | ||||
-rw-r--r-- | federationapi/federationapi.go | 4 | ||||
-rw-r--r-- | federationapi/federationapi_test.go | 236 | ||||
-rw-r--r-- | federationapi/internal/api.go | 8 | ||||
-rw-r--r-- | federationapi/internal/perform.go | 18 | ||||
-rw-r--r-- | federationapi/queue/destinationqueue.go | 31 | ||||
-rw-r--r-- | federationapi/queue/queue.go | 9 | ||||
-rw-r--r-- | federationapi/routing/query.go | 2 | ||||
-rw-r--r-- | federationapi/routing/routing.go | 2 | ||||
-rw-r--r-- | federationapi/routing/send.go | 2 | ||||
-rw-r--r-- | federationapi/routing/send_test.go | 2 | ||||
-rw-r--r-- | federationapi/routing/threepid.go | 16 | ||||
-rw-r--r-- | federationapi/storage/interface.go | 3 | ||||
-rw-r--r-- | federationapi/storage/postgres/joined_hosts_table.go | 5 | ||||
-rw-r--r-- | federationapi/storage/shared/storage.go | 26 |
17 files changed, 362 insertions, 79 deletions
diff --git a/federationapi/api/api.go b/federationapi/api/api.go index fc25194e..53d4701f 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -12,12 +12,16 @@ import ( // FederationInternalAPI is used to query information from the federation sender. type FederationInternalAPI interface { - FederationClient + gomatrixserverlib.FederatedStateClient + KeyserverFederationAPI gomatrixserverlib.KeyDatabase ClientFederationAPI RoomserverFederationAPI QueryServerKeys(ctx context.Context, request *QueryServerKeysRequest, response *QueryServerKeysResponse) error + LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error) + MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) + MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, suggestedOnly bool) (res gomatrixserverlib.MSC2946SpacesResponse, err error) // Broadcasts an EDU to all servers in rooms we are joined to. Used in the yggdrasil demos. PerformBroadcastEDU( @@ -60,17 +64,43 @@ type RoomserverFederationAPI interface { LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } -// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender +// KeyserverFederationAPI is a subset of gomatrixserverlib.FederationClient functions which the keyserver // implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in // this interface are of type FederationClientError -type FederationClient interface { - gomatrixserverlib.FederatedStateClient +type KeyserverFederationAPI interface { GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) +} + +// an interface for gmsl.FederationClient - contains functions called by federationapi only. +type FederationClient interface { + gomatrixserverlib.KeyClient + SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) + + // Perform operations + LookupRoomAlias(ctx context.Context, s gomatrixserverlib.ServerName, roomAlias string) (res gomatrixserverlib.RespDirectory, err error) + Peek(ctx context.Context, s gomatrixserverlib.ServerName, roomID, peekID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespPeek, err error) + MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMakeJoin, err error) + SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) + MakeLeave(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string) (res gomatrixserverlib.RespMakeLeave, err error) + SendLeave(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (err error) + SendInviteV2(ctx context.Context, s gomatrixserverlib.ServerName, request gomatrixserverlib.InviteV2Request) (res gomatrixserverlib.RespInviteV2, err error) + + GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) + + GetEventAuth(ctx context.Context, s gomatrixserverlib.ServerName, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (res gomatrixserverlib.RespEventAuth, err error) + GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (gomatrixserverlib.RespUserDevices, error) + ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (gomatrixserverlib.RespClaimKeys, error) + QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (gomatrixserverlib.RespQueryKeys, error) + Backfill(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string) (res gomatrixserverlib.Transaction, err error) MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, suggestedOnly bool) (res gomatrixserverlib.MSC2946SpacesResponse, err error) - LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error) + + ExchangeThirdPartyInvite(ctx context.Context, s gomatrixserverlib.ServerName, builder gomatrixserverlib.EventBuilder) (err error) + LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespState, err error) + LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) + LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } // FederationClientError is returned from FederationClient methods in the event of a problem. diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 0ece18e9..95c9a7fd 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -39,7 +39,7 @@ type KeyChangeConsumer struct { db storage.Database queues *queue.OutgoingQueues serverName gomatrixserverlib.ServerName - rsAPI roomserverAPI.RoomserverInternalAPI + rsAPI roomserverAPI.FederationRoomserverAPI topic string } @@ -50,7 +50,7 @@ func NewKeyChangeConsumer( js nats.JetStreamContext, queues *queue.OutgoingQueues, store storage.Database, - rsAPI roomserverAPI.RoomserverInternalAPI, + rsAPI roomserverAPI.FederationRoomserverAPI, ) *KeyChangeConsumer { return &KeyChangeConsumer{ ctx: process.Context(), @@ -120,6 +120,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { logger.WithError(err).Error("failed to calculate joined rooms for user") return true } + logrus.Infof("DEBUG: %v joined rooms for user %v", queryRes.RoomIDs, m.UserID) // send this key change to all servers who share rooms with this user. destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true) if err != nil { @@ -128,6 +129,9 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { } if len(destinations) == 0 { + logger.WithField("num_rooms", len(queryRes.RoomIDs)).Debug("user is in no federated rooms") + destinations, err = t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, false) + logrus.Infof("GetJoinedHostsForRooms exclude self=false -> %v %v", destinations, err) return true } // Pack the EDU and marshal it diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 80317ee6..7a0816ff 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/federationapi/queue" @@ -36,7 +37,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context cfg *config.FederationAPI - rsAPI api.RoomserverInternalAPI + rsAPI api.FederationRoomserverAPI jetstream nats.JetStreamContext durable string db storage.Database @@ -51,7 +52,7 @@ func NewOutputRoomEventConsumer( js nats.JetStreamContext, queues *queue.OutgoingQueues, store storage.Database, - rsAPI api.RoomserverInternalAPI, + rsAPI api.FederationRoomserverAPI, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -89,15 +90,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) switch output.Type { case api.OutputTypeNewRoomEvent: ev := output.NewRoomEvent.Event - - if output.NewRoomEvent.RewritesState { - if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil { - log.WithError(err).Errorf("roomserver output log: purge room state failure") - return false - } - } - - if err := s.processMessage(*output.NewRoomEvent); err != nil { + if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -145,7 +138,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { +func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error { addsStateEvents, missingEventIDs := ore.NeededStateEventIDs() // Ask the roomserver and add in the rest of the results into the set. @@ -164,7 +157,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err addsStateEvents = append(addsStateEvents, eventsRes.Events...) } - addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents)) + addsJoinedHosts, err := JoinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents)) if err != nil { return err } @@ -173,13 +166,13 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // expressed as a delta against the current state. // TODO(#290): handle EventIDMismatchError and recover the current state by // talking to the roomserver + logrus.Infof("room %s adds joined hosts: %v removes %v", ore.Event.RoomID(), addsJoinedHosts, ore.RemovesStateEventIDs) oldJoinedHosts, err := s.db.UpdateRoom( s.ctx, ore.Event.RoomID(), - ore.LastSentEventID, - ore.Event.EventID(), addsJoinedHosts, ore.RemovesStateEventIDs, + rewritesState, // if we're re-writing state, nuke all joined hosts before adding ) if err != nil { return err @@ -238,7 +231,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( return nil, err } - combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents) + combinedAddsJoinedHosts, err := JoinedHostsFromEvents(combinedAddsEvents) if err != nil { return nil, err } @@ -284,10 +277,10 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( return result, nil } -// joinedHostsFromEvents turns a list of state events into a list of joined hosts. +// JoinedHostsFromEvents turns a list of state events into a list of joined hosts. // This errors if one of the events was invalid. // It should be impossible for an invalid event to get this far in the pipeline. -func joinedHostsFromEvents(evs []*gomatrixserverlib.Event) ([]types.JoinedHost, error) { +func JoinedHostsFromEvents(evs []*gomatrixserverlib.Event) ([]types.JoinedHost, error) { var joinedHosts []types.JoinedHost for _, ev := range evs { if ev.Type() != "m.room.member" || ev.StateKey() == nil { diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index bec9ac77..ff159bee 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -93,8 +93,8 @@ func AddPublicRoutes( // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *base.BaseDendrite, - federation *gomatrixserverlib.FederationClient, - rsAPI roomserverAPI.RoomserverInternalAPI, + federation api.FederationClient, + rsAPI roomserverAPI.FederationRoomserverAPI, caches *caching.Caches, keyRing *gomatrixserverlib.KeyRing, resetBlacklist bool, diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index eedebc6c..ae244c56 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -3,18 +3,250 @@ package federationapi_test import ( "context" "crypto/ed25519" + "encoding/json" + "fmt" "strings" "testing" + "time" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/internal" - "github.com/matrix-org/dendrite/internal/test" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" ) +type fedRoomserverAPI struct { + rsapi.FederationRoomserverAPI + inputRoomEvents func(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse) + queryRoomsForUser func(ctx context.Context, req *rsapi.QueryRoomsForUserRequest, res *rsapi.QueryRoomsForUserResponse) error +} + +// PerformJoin will call this function +func (f *fedRoomserverAPI) InputRoomEvents(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse) { + if f.inputRoomEvents == nil { + return + } + f.inputRoomEvents(ctx, req, res) +} + +// keychange consumer calls this +func (f *fedRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *rsapi.QueryRoomsForUserRequest, res *rsapi.QueryRoomsForUserResponse) error { + if f.queryRoomsForUser == nil { + return nil + } + return f.queryRoomsForUser(ctx, req, res) +} + +// TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate +type fedClient struct { + api.FederationClient + allowJoins []*test.Room + keys map[gomatrixserverlib.ServerName]struct { + key ed25519.PrivateKey + keyID gomatrixserverlib.KeyID + } + t *testing.T + sentTxn bool +} + +func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) { + fmt.Println("GetServerKeys:", matrixServer) + var keys gomatrixserverlib.ServerKeys + var keyID gomatrixserverlib.KeyID + var pkey ed25519.PrivateKey + for srv, data := range f.keys { + if srv == matrixServer { + pkey = data.key + keyID = data.keyID + break + } + } + if pkey == nil { + return keys, nil + } + + keys.ServerName = matrixServer + keys.ValidUntilTS = gomatrixserverlib.AsTimestamp(time.Now().Add(10 * time.Hour)) + publicKey := pkey.Public().(ed25519.PublicKey) + keys.VerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.VerifyKey{ + keyID: { + Key: gomatrixserverlib.Base64Bytes(publicKey), + }, + } + toSign, err := json.Marshal(keys.ServerKeyFields) + if err != nil { + return keys, err + } + + keys.Raw, err = gomatrixserverlib.SignJSON( + string(matrixServer), keyID, pkey, toSign, + ) + if err != nil { + return keys, err + } + + return keys, nil +} + +func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMakeJoin, err error) { + for _, r := range f.allowJoins { + if r.ID == roomID { + res.RoomVersion = r.Version + res.JoinEvent = gomatrixserverlib.EventBuilder{ + Sender: userID, + RoomID: roomID, + Type: "m.room.member", + StateKey: &userID, + Content: gomatrixserverlib.RawJSON([]byte(`{"membership":"join"}`)), + PrevEvents: r.ForwardExtremities(), + } + var needed gomatrixserverlib.StateNeeded + needed, err = gomatrixserverlib.StateNeededForEventBuilder(&res.JoinEvent) + if err != nil { + f.t.Errorf("StateNeededForEventBuilder: %v", err) + return + } + res.JoinEvent.AuthEvents = r.MustGetAuthEventRefsForEvent(f.t, needed) + return + } + } + return +} +func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) { + for _, r := range f.allowJoins { + if r.ID == event.RoomID() { + r.InsertEvent(f.t, event.Headered(r.Version)) + f.t.Logf("Join event: %v", event.EventID()) + res.StateEvents = gomatrixserverlib.NewEventJSONsFromHeaderedEvents(r.CurrentState()) + res.AuthEvents = gomatrixserverlib.NewEventJSONsFromHeaderedEvents(r.Events()) + } + } + return +} + +func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) { + for _, edu := range t.EDUs { + if edu.Type == gomatrixserverlib.MDeviceListUpdate { + f.sentTxn = true + } + } + f.t.Logf("got /send") + return +} + +// Regression test to make sure that /send_join is updating the destination hosts synchronously and +// isn't relying on the roomserver. +func TestFederationAPIJoinThenKeyUpdate(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testFederationAPIJoinThenKeyUpdate(t, dbType) + }) +} + +func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { + base, close := testrig.CreateBaseDendrite(t, dbType) + base.Cfg.FederationAPI.PreferDirectFetch = true + defer close() + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + + serverA := gomatrixserverlib.ServerName("server.a") + serverAKeyID := gomatrixserverlib.KeyID("ed25519:servera") + serverAPrivKey := test.PrivateKeyA + creator := test.NewUser(t, test.WithSigningServer(serverA, serverAKeyID, serverAPrivKey)) + + myServer := base.Cfg.Global.ServerName + myServerKeyID := base.Cfg.Global.KeyID + myServerPrivKey := base.Cfg.Global.PrivateKey + joiningUser := test.NewUser(t, test.WithSigningServer(myServer, myServerKeyID, myServerPrivKey)) + fmt.Printf("creator: %v joining user: %v\n", creator.ID, joiningUser.ID) + room := test.NewRoom(t, creator) + + rsapi := &fedRoomserverAPI{ + inputRoomEvents: func(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse) { + if req.Asynchronous { + t.Errorf("InputRoomEvents from PerformJoin MUST be synchronous") + } + }, + queryRoomsForUser: func(ctx context.Context, req *rsapi.QueryRoomsForUserRequest, res *rsapi.QueryRoomsForUserResponse) error { + if req.UserID == joiningUser.ID && req.WantMembership == "join" { + res.RoomIDs = []string{room.ID} + return nil + } + return fmt.Errorf("unexpected queryRoomsForUser: %+v", *req) + }, + } + fc := &fedClient{ + allowJoins: []*test.Room{room}, + t: t, + keys: map[gomatrixserverlib.ServerName]struct { + key ed25519.PrivateKey + keyID gomatrixserverlib.KeyID + }{ + serverA: { + key: serverAPrivKey, + keyID: serverAKeyID, + }, + myServer: { + key: myServerPrivKey, + keyID: myServerKeyID, + }, + }, + } + fsapi := federationapi.NewInternalAPI(base, fc, rsapi, base.Caches, nil, false) + + var resp api.PerformJoinResponse + fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{ + RoomID: room.ID, + UserID: joiningUser.ID, + ServerNames: []gomatrixserverlib.ServerName{serverA}, + }, &resp) + if resp.JoinedVia != serverA { + t.Errorf("PerformJoin: joined via %v want %v", resp.JoinedVia, serverA) + } + if resp.LastError != nil { + t.Fatalf("PerformJoin: returned error: %+v", *resp.LastError) + } + + // Inject a keyserver key change event and ensure we try to send it out. If we don't, then the + // federationapi is incorrectly waiting for an output room event to arrive to update the joined + // hosts table. + key := keyapi.DeviceMessage{ + Type: keyapi.TypeDeviceKeyUpdate, + DeviceKeys: &keyapi.DeviceKeys{ + UserID: joiningUser.ID, + DeviceID: "MY_DEVICE", + DisplayName: "BLARGLE", + KeyJSON: []byte(`{}`), + }, + } + b, err := json.Marshal(key) + if err != nil { + t.Fatalf("Failed to marshal device message: %s", err) + } + + msg := &nats.Msg{ + Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + Header: nats.Header{}, + Data: b, + } + msg.Header.Set(jetstream.UserID, key.UserID) + + testrig.MustPublishMsgs(t, jsctx, msg) + time.Sleep(500 * time.Millisecond) + if !fc.sentTxn { + t.Fatalf("did not send device list update") + } +} + // Tests that event IDs with '/' in them (escaped as %2F) are correctly passed to the right handler and don't 404. // Relevant for v3 rooms and a cause of flakey sytests as the IDs are randomly generated. func TestRoomsV3URLEscapeDoNot404(t *testing.T) { @@ -86,7 +318,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { } gerr, ok := err.(gomatrix.HTTPError) if !ok { - t.Errorf("failed to cast response error as gomatrix.HTTPError") + t.Errorf("failed to cast response error as gomatrix.HTTPError: %s", err) continue } t.Logf("Error: %+v", gerr) diff --git a/federationapi/internal/api.go b/federationapi/internal/api.go index 4e9fa841..14056eaf 100644 --- a/federationapi/internal/api.go +++ b/federationapi/internal/api.go @@ -25,8 +25,8 @@ type FederationInternalAPI struct { db storage.Database cfg *config.FederationAPI statistics *statistics.Statistics - rsAPI roomserverAPI.RoomserverInternalAPI - federation *gomatrixserverlib.FederationClient + rsAPI roomserverAPI.FederationRoomserverAPI + federation api.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues joins sync.Map // joins currently in progress @@ -34,8 +34,8 @@ type FederationInternalAPI struct { func NewFederationInternalAPI( db storage.Database, cfg *config.FederationAPI, - rsAPI roomserverAPI.RoomserverInternalAPI, - federation *gomatrixserverlib.FederationClient, + rsAPI roomserverAPI.FederationRoomserverAPI, + federation api.FederationClient, statistics *statistics.Statistics, caches *caching.Caches, queues *queue.OutgoingQueues, diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 577cb70e..7ccd68ef 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -8,6 +8,7 @@ import ( "time" "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/consumers" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrix" @@ -235,6 +236,21 @@ func (r *FederationInternalAPI) performJoinUsingServer( return fmt.Errorf("respSendJoin.Check: %w", err) } + // We need to immediately update our list of joined hosts for this room now as we are technically + // joined. We must do this synchronously: we cannot rely on the roomserver output events as they + // will happen asyncly. If we don't update this table, you can end up with bad failure modes like + // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent + // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") + // The events are trusted now as we performed auth checks above. + joinedHosts, err := consumers.JoinedHostsFromEvents(respState.StateEvents.TrustedEvents(respMakeJoin.RoomVersion, false)) + if err != nil { + return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) + } + logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts") + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { + return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) + } + // If we successfully performed a send_join above then the other // server now thinks we're a part of the room. Send the newly // returned state to the roomserver to update our local view. @@ -650,7 +666,7 @@ func setDefaultRoomVersionFromJoinEvent(joinEvent gomatrixserverlib.EventBuilder // FederatedAuthProvider is an auth chain provider which fetches events from the server provided func federatedAuthProvider( - ctx context.Context, federation *gomatrixserverlib.FederationClient, + ctx context.Context, federation api.FederationClient, keyRing gomatrixserverlib.JSONVerifier, server gomatrixserverlib.ServerName, ) gomatrixserverlib.AuthChainProvider { // A list of events that we have retried, if they were not included in diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 74794040..b6edec5d 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -21,6 +21,7 @@ import ( "sync" "time" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage/shared" @@ -49,21 +50,21 @@ type destinationQueue struct { db storage.Database process *process.ProcessContext signing *SigningInfo - rsAPI api.RoomserverInternalAPI - client *gomatrixserverlib.FederationClient // federation client - origin gomatrixserverlib.ServerName // origin of requests - destination gomatrixserverlib.ServerName // destination of requests - running atomic.Bool // is the queue worker running? - backingOff atomic.Bool // true if we're backing off - overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more - statistics *statistics.ServerStatistics // statistics about this remote server - transactionIDMutex sync.Mutex // protects transactionID - transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful - notify chan struct{} // interrupts idle wait pending PDUs/EDUs - pendingPDUs []*queuedPDU // PDUs waiting to be sent - pendingEDUs []*queuedEDU // EDUs waiting to be sent - pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs - interruptBackoff chan bool // interrupts backoff + rsAPI api.FederationRoomserverAPI + client fedapi.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off + overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more + statistics *statistics.ServerStatistics // statistics about this remote server + transactionIDMutex sync.Mutex // protects transactionID + transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful + notify chan struct{} // interrupts idle wait pending PDUs/EDUs + pendingPDUs []*queuedPDU // PDUs waiting to be sent + pendingEDUs []*queuedEDU // EDUs waiting to be sent + pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs + interruptBackoff chan bool // interrupts backoff } // Send event adds the event to the pending queue for the destination. diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index d152886f..4c25c4ce 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -26,6 +26,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage/shared" @@ -39,9 +40,9 @@ type OutgoingQueues struct { db storage.Database process *process.ProcessContext disabled bool - rsAPI api.RoomserverInternalAPI + rsAPI api.FederationRoomserverAPI origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient + client fedapi.FederationClient statistics *statistics.Statistics signing *SigningInfo queuesMutex sync.Mutex // protects the below @@ -85,8 +86,8 @@ func NewOutgoingQueues( process *process.ProcessContext, disabled bool, origin gomatrixserverlib.ServerName, - client *gomatrixserverlib.FederationClient, - rsAPI api.RoomserverInternalAPI, + client fedapi.FederationClient, + rsAPI api.FederationRoomserverAPI, statistics *statistics.Statistics, signing *SigningInfo, ) *OutgoingQueues { diff --git a/federationapi/routing/query.go b/federationapi/routing/query.go index 707b7b01..316c61a1 100644 --- a/federationapi/routing/query.go +++ b/federationapi/routing/query.go @@ -30,7 +30,7 @@ import ( // RoomAliasToID converts the queried alias into a room ID and returns it func RoomAliasToID( httpReq *http.Request, - federation *gomatrixserverlib.FederationClient, + federation federationAPI.FederationClient, cfg *config.FederationAPI, rsAPI roomserverAPI.FederationRoomserverAPI, senderAPI federationAPI.FederationInternalAPI, diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 9f95ed07..e25f9866 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -54,7 +54,7 @@ func Setup( rsAPI roomserverAPI.FederationRoomserverAPI, fsAPI *fedInternal.FederationInternalAPI, keys gomatrixserverlib.JSONVerifier, - federation *gomatrixserverlib.FederationClient, + federation federationAPI.FederationClient, userAPI userapi.FederationUserAPI, keyAPI keyserverAPI.FederationKeyAPI, mscCfg *config.MSCs, diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 55a11367..c25dabce 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -85,7 +85,7 @@ func Send( rsAPI api.FederationRoomserverAPI, keyAPI keyapi.FederationKeyAPI, keys gomatrixserverlib.JSONVerifier, - federation *gomatrixserverlib.FederationClient, + federation federationAPI.FederationClient, mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, producer *producers.SyncAPIProducer, diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 011d4e34..a111580c 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -8,8 +8,8 @@ import ( "time" "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/test" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 16f245ce..ccde9168 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -57,7 +58,7 @@ var ( func CreateInvitesFrom3PIDInvites( req *http.Request, rsAPI api.FederationRoomserverAPI, cfg *config.FederationAPI, - federation *gomatrixserverlib.FederationClient, + federation federationAPI.FederationClient, userAPI userapi.FederationUserAPI, ) util.JSONResponse { var body invites @@ -107,7 +108,7 @@ func ExchangeThirdPartyInvite( roomID string, rsAPI api.FederationRoomserverAPI, cfg *config.FederationAPI, - federation *gomatrixserverlib.FederationClient, + federation federationAPI.FederationClient, ) util.JSONResponse { var builder gomatrixserverlib.EventBuilder if err := json.Unmarshal(request.Content(), &builder); err != nil { @@ -165,7 +166,12 @@ func ExchangeThirdPartyInvite( // Ask the requesting server to sign the newly created event so we know it // acknowledged it - signedEvent, err := federation.SendInvite(httpReq.Context(), request.Origin(), event) + inviteReq, err := gomatrixserverlib.NewInviteV2Request(event.Headered(verRes.RoomVersion), nil) + if err != nil { + util.GetLogger(httpReq.Context()).WithError(err).Error("failed to make invite v2 request") + return jsonerror.InternalServerError() + } + signedEvent, err := federation.SendInviteV2(httpReq.Context(), request.Origin(), inviteReq) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed") return jsonerror.InternalServerError() @@ -205,7 +211,7 @@ func ExchangeThirdPartyInvite( func createInviteFrom3PIDInvite( ctx context.Context, rsAPI api.FederationRoomserverAPI, cfg *config.FederationAPI, - inv invite, federation *gomatrixserverlib.FederationClient, + inv invite, federation federationAPI.FederationClient, userAPI userapi.FederationUserAPI, ) (*gomatrixserverlib.Event, error) { verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} @@ -335,7 +341,7 @@ func buildMembershipEvent( // them responded with an error. func sendToRemoteServer( ctx context.Context, inv invite, - federation *gomatrixserverlib.FederationClient, _ *config.FederationAPI, + federation federationAPI.FederationClient, _ *config.FederationAPI, builder gomatrixserverlib.EventBuilder, ) (err error) { remoteServers := make([]gomatrixserverlib.ServerName, 2) diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index e3038651..29254948 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -25,13 +25,12 @@ import ( type Database interface { gomatrixserverlib.KeyDatabase - UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) + UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string, purgeRoomFirst bool) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) // GetJoinedHostsForRooms returns the complete set of servers in the rooms given. GetJoinedHostsForRooms(ctx context.Context, roomIDs []string, excludeSelf bool) ([]gomatrixserverlib.ServerName, error) - PurgeRoomState(ctx context.Context, roomID string) error StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) diff --git a/federationapi/storage/postgres/joined_hosts_table.go b/federationapi/storage/postgres/joined_hosts_table.go index 5c95b72a..bb6f6bfa 100644 --- a/federationapi/storage/postgres/joined_hosts_table.go +++ b/federationapi/storage/postgres/joined_hosts_table.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) const joinedHostsSchema = ` @@ -111,6 +112,7 @@ func (s *joinedHostsStatements) InsertJoinedHosts( roomID, eventID string, serverName gomatrixserverlib.ServerName, ) error { + logrus.Debugf("FederationJoinedHosts: INSERT %v %v %v", roomID, eventID, serverName) stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt) _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) return err @@ -119,6 +121,7 @@ func (s *joinedHostsStatements) InsertJoinedHosts( func (s *joinedHostsStatements) DeleteJoinedHosts( ctx context.Context, txn *sql.Tx, eventIDs []string, ) error { + logrus.Debugf("FederationJoinedHosts: DELETE WITH EVENTS %v", eventIDs) stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt) _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs)) return err @@ -127,6 +130,7 @@ func (s *joinedHostsStatements) DeleteJoinedHosts( func (s *joinedHostsStatements) DeleteJoinedHostsForRoom( ctx context.Context, txn *sql.Tx, roomID string, ) error { + logrus.Debugf("FederationJoinedHosts: DELETE ALL IN ROOM %v", roomID) stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt) _, err := stmt.ExecContext(ctx, roomID) return err @@ -207,6 +211,7 @@ func joinedHostsFromStmt( ServerName: gomatrixserverlib.ServerName(serverName), }) } + logrus.Debugf("FederationJoinedHosts: SELECT %v => %+v", roomID, result) return result, rows.Err() } diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index 160c7f6f..a00d782f 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -63,11 +63,21 @@ func (r *Receipt) String() string { // this isn't a duplicate message. func (d *Database) UpdateRoom( ctx context.Context, - roomID, oldEventID, newEventID string, + roomID string, addHosts []types.JoinedHost, removeHosts []string, + purgeRoomFirst bool, ) (joinedHosts []types.JoinedHost, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + if purgeRoomFirst { + // If the event is a create event then we'll delete all of the existing + // data for the room. The only reason that a create event would be replayed + // to us in this way is if we're about to receive the entire room state. + if err = d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { + return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err) + } + } + joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err @@ -138,20 +148,6 @@ func (d *Database) StoreJSON( }, nil } -func (d *Database) PurgeRoomState( - ctx context.Context, roomID string, -) error { - return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // If the event is a create event then we'll delete all of the existing - // data for the room. The only reason that a create event would be replayed - // to us in this way is if we're about to receive the entire room state. - if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err) - } - return nil - }) -} - func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName) |