aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/appservice_test.go197
-rw-r--r--appservice/consumers/roomserver.go12
-rw-r--r--setup/jetstream/streams.go6
-rw-r--r--syncapi/consumers/roomserver.go9
-rw-r--r--syncapi/producers/appservices.go33
-rw-r--r--syncapi/syncapi.go9
6 files changed, 264 insertions, 2 deletions
diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go
index eca63371..aa2af9f2 100644
--- a/appservice/appservice_test.go
+++ b/appservice/appservice_test.go
@@ -14,8 +14,18 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/federationapi/statistics"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/dendrite/syncapi"
+ uapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
+ "github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/appservice/api"
@@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
close(evChan)
})
}
+
+// Note: If this test panics, it is because we timed out waiting for the
+// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the
+// syncAPI unhappy, as it is unable to write to the database.
+func TestOutputAppserviceEvent(t *testing.T) {
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
+ defer closeDB()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ natsInstance := &jetstream.NATSInstance{}
+
+ evChan := make(chan struct{})
+
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ // Create required internal APIs
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // Create the router, so we can hit `/joined_members`
+ routers := httputil.NewRouters()
+
+ accessTokens := map[*test.User]userDevice{
+ bob: {},
+ }
+
+ usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
+ clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics)
+ createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers)
+
+ room := test.NewRoom(t, alice)
+
+ // Invite Bob
+ room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{
+ "membership": "invite",
+ }, test.WithStateKey(bob.ID))
+
+ // create a dummy AS url, handling the events
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var txn consumers.ApplicationServiceTransaction
+ err := json.NewDecoder(r.Body).Decode(&txn)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, ev := range txn.Events {
+ if ev.Type != spec.MRoomMember {
+ continue
+ }
+ if ev.StateKey != nil && *ev.StateKey == bob.ID {
+ membership := gjson.GetBytes(ev.Content, "membership").Str
+ t.Logf("Processing membership: %s", membership)
+ switch membership {
+ case spec.Invite:
+ // Accept the invite
+ joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
+ "membership": "join",
+ }, test.WithStateKey(bob.ID))
+
+ if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+ case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that
+ rec := httptest.NewRecorder()
+ req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil)
+ req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken)
+ routers.Client.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String())
+ }
+
+ // Both Alice and Bob should be joined. If not, we have a race condition
+ if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() {
+ t.Errorf("Alice is not joined to the room") // in theory should not happen
+ }
+ if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() {
+ t.Errorf("Bob is not joined to the room")
+ }
+ evChan <- struct{}{}
+ default:
+ t.Fatalf("Unexpected membership: %s", membership)
+ }
+ }
+ }
+ }))
+ defer srv.Close()
+
+ as := &config.ApplicationService{
+ ID: "someID",
+ URL: srv.URL,
+ ASToken: "",
+ HSToken: "",
+ SenderLocalpart: "senderLocalPart",
+ NamespaceMap: map[string][]config.ApplicationServiceNamespace{
+ "users": {{RegexpObject: regexp.MustCompile(bob.ID)}},
+ "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}},
+ },
+ }
+ as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation)
+
+ // Create a dummy application service
+ cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}
+
+ // Prepare AS Streams on the old topic to validate that they get deleted
+ jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+
+ token := jetstream.Tokenise(as.ID)
+ if err := jetstream.JetStreamConsumer(
+ processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ cfg.Global.JetStream.Durable("Appservice_"+token),
+ 50, // maximum number of events to send in a single transaction
+ func(ctx context.Context, msgs []*nats.Msg) bool {
+ return true
+ },
+ ); err != nil {
+ t.Fatal(err)
+ }
+
+ // Start the syncAPI to have `/joined_members` available
+ syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics)
+
+ // start the consumer
+ appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
+
+ // At this point, the old JetStream consumers should be deleted
+ for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) {
+ if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" {
+ t.Fatalf("Consumer still exists")
+ }
+ }
+
+ // Create the room, this triggers the AS to receive an invite for Bob.
+ if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ select {
+ // Pretty generous timeout duration...
+ case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events
+ t.Errorf("Timed out waiting for join event")
+ case <-evChan:
+ }
+ close(evChan)
+ })
+}
+
+type userDevice struct {
+ accessToken string
+ deviceID string
+ password string
+}
+
+func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) {
+ t.Helper()
+ for u := range accessTokens {
+ localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID)
+ userRes := &uapi.PerformAccountCreationResponse{}
+ password := util.RandomString(8)
+ if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{
+ AccountType: u.AccountType,
+ Localpart: localpart,
+ ServerName: serverName,
+ Password: password,
+ }, userRes); err != nil {
+ t.Errorf("failed to create account: %s", err)
+ }
+ req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{
+ "type": authtypes.LoginTypePassword,
+ "identifier": map[string]interface{}{
+ "type": "m.id.user",
+ "user": u.ID,
+ },
+ "password": password,
+ }))
+ rec := httptest.NewRecorder()
+ routers.Client.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Fatalf("failed to login: %s", rec.Body.String())
+ }
+ accessTokens[u] = userDevice{
+ accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(),
+ deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(),
+ password: password,
+ }
+ }
+}
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index e8b9211c..b7fc1f69 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
cfg: cfg,
jetstream: js,
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
+ durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices))
for _, as := range s.cfg.Derived.ApplicationServices {
appsvc := as
state := &appserviceState{
@@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error {
); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err)
}
+ durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token))
+ }
+ // Cleanup any consumers still existing on the OutputRoomEvent stream
+ // to avoid messages not being deleted
+ for _, consumerName := range durableNames {
+ err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull")
+ if err != nil && err != nats.ErrConsumerNotFound {
+ return err
+ }
}
return nil
}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index 74140792..1dc9f4ce 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -20,6 +20,7 @@ var (
InputDeviceListUpdate = "InputDeviceListUpdate"
InputSigningKeyUpdate = "InputSigningKeyUpdate"
OutputRoomEvent = "OutputRoomEvent"
+ OutputAppserviceEvent = "OutputAppserviceEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent"
OutputTypingEvent = "OutputTypingEvent"
@@ -66,6 +67,11 @@ var streams = []*nats.StreamConfig{
Storage: nats.FileStorage,
},
{
+ Name: OutputAppserviceEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
Name: OutputSendToDeviceEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 666f900d..81c532f1 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct {
inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts fulltext.Indexer
+ asProducer *producers.AppserviceEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer(
inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
+ asProducer *producers.AppserviceEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream,
rsAPI: rsAPI,
fts: fts,
+ asProducer: asProducer,
}
}
@@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
+ if err == nil && s.asProducer != nil {
+ if err = s.asProducer.ProduceRoomEvents(msg); err != nil {
+ log.WithError(err).Warn("failed to produce OutputAppserviceEvent")
+ }
+ }
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go
new file mode 100644
index 00000000..bb0dbb9c
--- /dev/null
+++ b/syncapi/producers/appservices.go
@@ -0,0 +1,33 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "github.com/nats-io/nats.go"
+)
+
+// AppserviceEventProducer produces events for the appservice API to consume
+type AppserviceEventProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+func (a *AppserviceEventProducer) ProduceRoomEvents(
+ msg *nats.Msg,
+) error {
+ msg.Subject = a.Topic
+ _, err := a.JetStream.PublishMsg(msg)
+ return err
+}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 091e3db4..0418ffc0 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -100,9 +100,16 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start key change consumer")
}
+ var asProducer *producers.AppserviceEventProducer
+ if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 {
+ asProducer = &producers.AppserviceEventProducer{
+ JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
+ }
+ }
+
roomConsumer := consumers.NewOutputRoomEventConsumer(
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI, fts,
+ streams.InviteStreamProvider, rsAPI, fts, asProducer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")