aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-12-12 12:13:55 +0100
committerGitHub <noreply@github.com>2023-12-12 12:13:55 +0100
commit1555b3542d7da8243abc21e6a29758ebe419e5b5 (patch)
treefb717176258a440eb05d4101338fba867b62210e /appservice
parent185ad6b00de8079fb6a638ed63e392dfe9f2b49f (diff)
Introduce a new stream for the appservice consumer (#3277)
This introduces a new stream the syncAPI produces to once it processed a `OutputRoomEvent` and the appservices consumes. This is to work around a race condition where appservices receive an event before the syncAPI has handled it, this can result in e.g. calls to `/joined_members` returning a wrong membership list.
Diffstat (limited to 'appservice')
-rw-r--r--appservice/appservice_test.go197
-rw-r--r--appservice/consumers/roomserver.go12
2 files changed, 208 insertions, 1 deletions
diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go
index eca63371..aa2af9f2 100644
--- a/appservice/appservice_test.go
+++ b/appservice/appservice_test.go
@@ -14,8 +14,18 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/federationapi/statistics"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/dendrite/syncapi"
+ uapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
+ "github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/appservice/api"
@@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
close(evChan)
})
}
+
+// Note: If this test panics, it is because we timed out waiting for the
+// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the
+// syncAPI unhappy, as it is unable to write to the database.
+func TestOutputAppserviceEvent(t *testing.T) {
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
+ defer closeDB()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ natsInstance := &jetstream.NATSInstance{}
+
+ evChan := make(chan struct{})
+
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ // Create required internal APIs
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // Create the router, so we can hit `/joined_members`
+ routers := httputil.NewRouters()
+
+ accessTokens := map[*test.User]userDevice{
+ bob: {},
+ }
+
+ usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
+ clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics)
+ createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers)
+
+ room := test.NewRoom(t, alice)
+
+ // Invite Bob
+ room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{
+ "membership": "invite",
+ }, test.WithStateKey(bob.ID))
+
+ // create a dummy AS url, handling the events
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var txn consumers.ApplicationServiceTransaction
+ err := json.NewDecoder(r.Body).Decode(&txn)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, ev := range txn.Events {
+ if ev.Type != spec.MRoomMember {
+ continue
+ }
+ if ev.StateKey != nil && *ev.StateKey == bob.ID {
+ membership := gjson.GetBytes(ev.Content, "membership").Str
+ t.Logf("Processing membership: %s", membership)
+ switch membership {
+ case spec.Invite:
+ // Accept the invite
+ joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
+ "membership": "join",
+ }, test.WithStateKey(bob.ID))
+
+ if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+ case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that
+ rec := httptest.NewRecorder()
+ req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil)
+ req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken)
+ routers.Client.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String())
+ }
+
+ // Both Alice and Bob should be joined. If not, we have a race condition
+ if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() {
+ t.Errorf("Alice is not joined to the room") // in theory should not happen
+ }
+ if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() {
+ t.Errorf("Bob is not joined to the room")
+ }
+ evChan <- struct{}{}
+ default:
+ t.Fatalf("Unexpected membership: %s", membership)
+ }
+ }
+ }
+ }))
+ defer srv.Close()
+
+ as := &config.ApplicationService{
+ ID: "someID",
+ URL: srv.URL,
+ ASToken: "",
+ HSToken: "",
+ SenderLocalpart: "senderLocalPart",
+ NamespaceMap: map[string][]config.ApplicationServiceNamespace{
+ "users": {{RegexpObject: regexp.MustCompile(bob.ID)}},
+ "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}},
+ },
+ }
+ as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation)
+
+ // Create a dummy application service
+ cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}
+
+ // Prepare AS Streams on the old topic to validate that they get deleted
+ jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+
+ token := jetstream.Tokenise(as.ID)
+ if err := jetstream.JetStreamConsumer(
+ processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ cfg.Global.JetStream.Durable("Appservice_"+token),
+ 50, // maximum number of events to send in a single transaction
+ func(ctx context.Context, msgs []*nats.Msg) bool {
+ return true
+ },
+ ); err != nil {
+ t.Fatal(err)
+ }
+
+ // Start the syncAPI to have `/joined_members` available
+ syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics)
+
+ // start the consumer
+ appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
+
+ // At this point, the old JetStream consumers should be deleted
+ for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) {
+ if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" {
+ t.Fatalf("Consumer still exists")
+ }
+ }
+
+ // Create the room, this triggers the AS to receive an invite for Bob.
+ if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ select {
+ // Pretty generous timeout duration...
+ case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events
+ t.Errorf("Timed out waiting for join event")
+ case <-evChan:
+ }
+ close(evChan)
+ })
+}
+
+type userDevice struct {
+ accessToken string
+ deviceID string
+ password string
+}
+
+func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) {
+ t.Helper()
+ for u := range accessTokens {
+ localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID)
+ userRes := &uapi.PerformAccountCreationResponse{}
+ password := util.RandomString(8)
+ if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{
+ AccountType: u.AccountType,
+ Localpart: localpart,
+ ServerName: serverName,
+ Password: password,
+ }, userRes); err != nil {
+ t.Errorf("failed to create account: %s", err)
+ }
+ req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{
+ "type": authtypes.LoginTypePassword,
+ "identifier": map[string]interface{}{
+ "type": "m.id.user",
+ "user": u.ID,
+ },
+ "password": password,
+ }))
+ rec := httptest.NewRecorder()
+ routers.Client.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("failed to login: %s", rec.Body.String())
+ }
+ accessTokens[u] = userDevice{
+ accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(),
+ deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(),
+ password: password,
+ }
+ }
+}
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index e8b9211c..b7fc1f69 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
cfg: cfg,
jetstream: js,
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
+ durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices))
for _, as := range s.cfg.Derived.ApplicationServices {
appsvc := as
state := &appserviceState{
@@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error {
); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err)
}
+ durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token))
+ }
+ // Cleanup any consumers still existing on the OutputRoomEvent stream
+ // to avoid messages not being deleted
+ for _, consumerName := range durableNames {
+ err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull")
+ if err != nil && err != nats.ErrConsumerNotFound {
+ return err
+ }
}
return nil
}