aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--clientapi/routing/admin.go22
-rw-r--r--clientapi/routing/routing.go17
-rw-r--r--cmd/generate-config/main.go9
-rw-r--r--dendrite-sample.monolith.yaml9
-rw-r--r--dendrite-sample.polylith.yaml9
-rw-r--r--docs/administration/4_adminapi.md5
-rw-r--r--docs/installation/7_configuration.md4
-rw-r--r--internal/fulltext/bleve.go3
-rw-r--r--internal/fulltext/bleve_test.go10
-rw-r--r--setup/base/base.go7
-rw-r--r--setup/config/config_syncapi.go10
-rw-r--r--syncapi/consumers/clientapi.go109
-rw-r--r--syncapi/consumers/roomserver.go54
-rw-r--r--syncapi/routing/context.go7
-rw-r--r--syncapi/routing/routing.go28
-rw-r--r--syncapi/routing/search.go344
-rw-r--r--syncapi/storage/interface.go1
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go28
-rw-r--r--syncapi/storage/shared/syncserver.go8
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go41
-rw-r--r--syncapi/storage/tables/interface.go1
-rw-r--r--syncapi/syncapi.go7
22 files changed, 680 insertions, 53 deletions
diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go
index 0c5f8c16..5089d7c3 100644
--- a/clientapi/routing/admin.go
+++ b/clientapi/routing/admin.go
@@ -3,15 +3,19 @@ package routing
import (
"encoding/json"
"net/http"
+ "time"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/httputil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
)
func AdminEvacuateRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
@@ -138,3 +142,21 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap
},
}
}
+
+func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, natsClient *nats.Conn) util.JSONResponse {
+ if device.AccountType != userapi.AccountTypeAdmin {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("This API can only be used by admin users."),
+ }
+ }
+ _, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10)
+ if err != nil {
+ logrus.WithError(err).Error("failed to publish nats message")
+ return jsonerror.InternalServerError()
+ }
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index d7a48d22..9c1f8f72 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -20,6 +20,12 @@ import (
"strings"
"github.com/gorilla/mux"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/auth"
@@ -34,11 +40,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
- "github.com/nats-io/nats.go"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
)
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
@@ -161,6 +162,12 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
+ dendriteAdminRouter.Handle("/admin/fulltext/reindex",
+ httputil.MakeAuthAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ return AdminReindex(req, cfg, device, natsClient)
+ }),
+ ).Methods(http.MethodGet, http.MethodOptions)
+
// server notifications
if cfg.Matrix.ServerNotices.Enabled {
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go
index c24e8153..8b042c56 100644
--- a/cmd/generate-config/main.go
+++ b/cmd/generate-config/main.go
@@ -5,10 +5,11 @@ import (
"fmt"
"path/filepath"
- "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt"
"gopkg.in/yaml.v2"
+
+ "github.com/matrix-org/dendrite/setup/config"
)
func main() {
@@ -82,6 +83,12 @@ func main() {
EnableInbound: true,
EnableOutbound: true,
}
+ cfg.SyncAPI.Fulltext = config.Fulltext{
+ Enabled: true,
+ IndexPath: config.Path(filepath.Join(*dirPath, "fulltextindex")),
+ InMemory: true,
+ Language: "en",
+ }
}
} else {
var err error
diff --git a/dendrite-sample.monolith.yaml b/dendrite-sample.monolith.yaml
index f1758f54..3cad17da 100644
--- a/dendrite-sample.monolith.yaml
+++ b/dendrite-sample.monolith.yaml
@@ -275,10 +275,15 @@ sync_api:
# address of the client. This is likely required if Dendrite is running behind
# a reverse proxy server.
# real_ip_header: X-Real-IP
- fulltext:
+
+ # Configuration for the fulltext search
+ search:
enabled: false
+ # The path where the fulltext index will be created in.
index_path: "./fulltextindex"
- language: "en" # more possible languages can be found at https://github.com/blevesearch/bleve/tree/master/analysis/lang
+ # The language most likely to be used on the server - used when indexing, to ensure the returned results match the expectations.
+ # A full list of possible languages can be found at https://github.com/blevesearch/bleve/tree/master/analysis/lang
+ language: "en"
# Configuration for the User API.
user_api:
diff --git a/dendrite-sample.polylith.yaml b/dendrite-sample.polylith.yaml
index 97d10825..e58062fe 100644
--- a/dendrite-sample.polylith.yaml
+++ b/dendrite-sample.polylith.yaml
@@ -326,10 +326,15 @@ sync_api:
max_open_conns: 10
max_idle_conns: 2
conn_max_lifetime: -1
- fulltext:
+
+ # Configuration for the fulltext search
+ search:
enabled: false
+ # The path where the fulltext index will be created in.
index_path: "./fulltextindex"
- language: "en" # more possible languages can be found at https://github.com/blevesearch/bleve/tree/master/analysis/lang
+ # The language most likely to be used on the server - used when indexing, to ensure the returned results match the expectations.
+ # A full list of possible languages can be found at https://github.com/blevesearch/bleve/tree/master/analysis/lang
+ language: "en"
# This option controls which HTTP header to inspect to find the real remote IP
# address of the client. This is likely required if Dendrite is running behind
diff --git a/docs/administration/4_adminapi.md b/docs/administration/4_adminapi.md
index a34bfde1..1712bb1b 100644
--- a/docs/administration/4_adminapi.md
+++ b/docs/administration/4_adminapi.md
@@ -57,6 +57,11 @@ Request body format:
Reset the password of a local user. The `localpart` is the username only, i.e. if
the full user ID is `@alice:domain.com` then the local part is `alice`.
+## GET `/_dendrite/admin/fulltext/reindex`
+
+This endpoint instructs Dendrite to reindex all searchable events (`m.room.message`, `m.room.topic` and `m.room.name`). An empty JSON body will be returned immediately.
+Indexing is done in the background, the server logs every 1000 events (or below) when they are being indexed. Once reindexing is done, you'll see something along the lines `Indexed 69586 events in 53.68223182s` in your debug logs.
+
## POST `/_synapse/admin/v1/send_server_notice`
Request body format:
diff --git a/docs/installation/7_configuration.md b/docs/installation/7_configuration.md
index 8fbe71c4..67cd339c 100644
--- a/docs/installation/7_configuration.md
+++ b/docs/installation/7_configuration.md
@@ -140,12 +140,12 @@ room_server:
## Fulltext search
-Dendrite supports experimental fulltext indexing using [Bleve](https://github.com/blevesearch/bleve), it is configured in the `sync_api` section as follows. Depending on the language most likely to be used on the server, it might make sense to change the `language` used when indexing, to ensure the returned results match the expections. A full list of possible languages can be found [here](https://github.com/blevesearch/bleve/tree/master/analysis/lang).
+Dendrite supports experimental fulltext indexing using [Bleve](https://github.com/blevesearch/bleve), it is configured in the `sync_api` section as follows. Depending on the language most likely to be used on the server, it might make sense to change the `language` used when indexing, to ensure the returned results match the expectations. A full list of possible languages can be found [here](https://github.com/blevesearch/bleve/tree/master/analysis/lang).
```yaml
sync_api:
# ...
- fulltext:
+ search:
enabled: false
index_path: "./fulltextindex"
language: "en"
diff --git a/internal/fulltext/bleve.go b/internal/fulltext/bleve.go
index b07c0e51..da8932f5 100644
--- a/internal/fulltext/bleve.go
+++ b/internal/fulltext/bleve.go
@@ -22,8 +22,9 @@ import (
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/mapping"
- "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/setup/config"
)
// Search contains all existing bleve.Index
diff --git a/internal/fulltext/bleve_test.go b/internal/fulltext/bleve_test.go
index 84a28242..d16397a4 100644
--- a/internal/fulltext/bleve_test.go
+++ b/internal/fulltext/bleve_test.go
@@ -27,11 +27,11 @@ import (
func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search {
t.Helper()
- cfg := config.Fulltext{}
- cfg.Defaults(config.DefaultOpts{
- Generate: true,
- Monolithic: true,
- })
+ cfg := config.Fulltext{
+ Enabled: true,
+ InMemory: true,
+ Language: "en",
+ }
if tempDir != "" {
cfg.IndexPath = config.Path(tempDir)
cfg.InMemory = false
diff --git a/setup/base/base.go b/setup/base/base.go
index 32716c76..0636c7b8 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -37,16 +37,13 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
+ "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/internal"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
-
"github.com/gorilla/mux"
"github.com/kardianos/minwinsvc"
@@ -61,6 +58,8 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp"
)
diff --git a/setup/config/config_syncapi.go b/setup/config/config_syncapi.go
index c890b005..edef22c9 100644
--- a/setup/config/config_syncapi.go
+++ b/setup/config/config_syncapi.go
@@ -10,7 +10,7 @@ type SyncAPI struct {
RealIPHeader string `yaml:"real_ip_header"`
- Fulltext Fulltext `yaml:"fulltext"`
+ Fulltext Fulltext `yaml:"search"`
}
func (c *SyncAPI) Defaults(opts DefaultOpts) {
@@ -52,16 +52,12 @@ func (f *Fulltext) Defaults(opts DefaultOpts) {
f.Enabled = false
f.IndexPath = "./fulltextindex"
f.Language = "en"
- if opts.Generate {
- f.Enabled = true
- f.InMemory = true
- }
}
func (f *Fulltext) Verify(configErrs *ConfigErrors, isMonolith bool) {
if !f.Enabled {
return
}
- checkNotEmpty(configErrs, "syncapi.fulltext.index_path", string(f.IndexPath))
- checkNotEmpty(configErrs, "syncapi.fulltext.language", f.Language)
+ checkNotEmpty(configErrs, "syncapi.search.index_path", string(f.IndexPath))
+ checkNotEmpty(configErrs, "syncapi.search.language", f.Language)
}
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index a170a6ec..b11ed4f5 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -17,14 +17,18 @@ package consumers
import (
"context"
"encoding/json"
+ "strings"
+ "time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
+ "github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -35,14 +39,18 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- stream types.StreamProvider
- notifier *notifier.Notifier
- serverName gomatrixserverlib.ServerName
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ nats *nats.Conn
+ durable string
+ topic string
+ topicReIndex string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ serverName gomatrixserverlib.ServerName
+ fts *fulltext.Search
+ cfg *config.SyncAPI
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -50,24 +58,93 @@ func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
+ nats *nats.Conn,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
+ fts *fulltext.Search,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
- db: store,
- notifier: notifier,
- stream: stream,
- serverName: cfg.Matrix.ServerName,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
+ topicReIndex: cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
+ nats: nats,
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ serverName: cfg.Matrix.ServerName,
+ fts: fts,
+ cfg: cfg,
}
}
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
+ _, err := s.nats.Subscribe(s.topicReIndex, func(msg *nats.Msg) {
+ if err := msg.Ack(); err != nil {
+ return
+ }
+ if !s.cfg.Fulltext.Enabled {
+ logrus.Warn("Fulltext indexing is disabled")
+ return
+ }
+ ctx := context.Background()
+ logrus.Debugf("Starting to index events")
+ var offset int
+ start := time.Now()
+ count := 0
+ var id int64 = 0
+ for {
+ evs, err := s.db.ReIndex(ctx, 1000, id)
+ if err != nil {
+ logrus.WithError(err).Errorf("unable to get events to index")
+ return
+ }
+ if len(evs) == 0 {
+ break
+ }
+ logrus.Debugf("Indexing %d events", len(evs))
+ elements := make([]fulltext.IndexElement, 0, len(evs))
+
+ for streamPos, ev := range evs {
+ id = streamPos
+ e := fulltext.IndexElement{
+ EventID: ev.EventID(),
+ RoomID: ev.RoomID(),
+ StreamPosition: streamPos,
+ }
+ e.SetContentType(ev.Type())
+
+ switch ev.Type() {
+ case "m.room.message":
+ e.Content = gjson.GetBytes(ev.Content(), "body").String()
+ case gomatrixserverlib.MRoomName:
+ e.Content = gjson.GetBytes(ev.Content(), "name").String()
+ case gomatrixserverlib.MRoomTopic:
+ e.Content = gjson.GetBytes(ev.Content(), "topic").String()
+ default:
+ continue
+ }
+
+ if strings.TrimSpace(e.Content) == "" {
+ continue
+ }
+ elements = append(elements, e)
+ }
+ if err = s.fts.Index(elements...); err != nil {
+ logrus.WithError(err).Error("unable to index events")
+ continue
+ }
+ offset += len(evs)
+ count += len(elements)
+ }
+ logrus.Debugf("Indexed %d events in %v", count, time.Since(start))
+ })
+ if err != nil {
+ return err
+ }
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, 1,
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 0964ae20..3756ad75 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -24,7 +24,9 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
+ "github.com/tidwall/gjson"
+ "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -46,6 +48,7 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
+ fts *fulltext.Search
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -58,6 +61,7 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.SyncRoomserverAPI,
+ fts *fulltext.Search,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -70,6 +74,7 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
+ fts: fts,
}
}
@@ -251,6 +256,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write new event failure")
return nil
}
+ if err = s.writeFTS(ev, pduPos); err != nil {
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "type": ev.Type(),
+ }).WithError(err).Warn("failed to index fulltext element")
+ }
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
@@ -295,6 +306,13 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return nil
}
+ if err = s.writeFTS(ev, pduPos); err != nil {
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "type": ev.Type(),
+ }).WithError(err).Warn("failed to index fulltext element")
+ }
+
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
@@ -451,3 +469,39 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
event.Event, err = event.SetUnsigned(prev)
return event, err
}
+
+func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error {
+ if !s.cfg.Fulltext.Enabled {
+ return nil
+ }
+ e := fulltext.IndexElement{
+ EventID: ev.EventID(),
+ RoomID: ev.RoomID(),
+ StreamPosition: int64(pduPosition),
+ }
+ e.SetContentType(ev.Type())
+
+ switch ev.Type() {
+ case "m.room.message":
+ e.Content = gjson.GetBytes(ev.Content(), "body").String()
+ case gomatrixserverlib.MRoomName:
+ e.Content = gjson.GetBytes(ev.Content(), "name").String()
+ case gomatrixserverlib.MRoomTopic:
+ e.Content = gjson.GetBytes(ev.Content(), "topic").String()
+ case gomatrixserverlib.MRoomRedaction:
+ log.Tracef("Redacting event: %s", ev.Redacts())
+ if err := s.fts.Delete(ev.Redacts()); err != nil {
+ return fmt.Errorf("failed to delete entry from fulltext index: %w", err)
+ }
+ return nil
+ default:
+ return nil
+ }
+ if e.Content != "" {
+ log.Tracef("Indexing element: %+v", e)
+ if err := s.fts.Index(e); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index 13c4e9d8..1ebdfe60 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -37,11 +37,11 @@ import (
type ContextRespsonse struct {
End string `json:"end"`
- Event gomatrixserverlib.ClientEvent `json:"event"`
+ Event *gomatrixserverlib.ClientEvent `json:"event,omitempty"`
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
Start string `json:"start"`
- State []gomatrixserverlib.ClientEvent `json:"state"`
+ State []gomatrixserverlib.ClientEvent `json:"state,omitempty"`
}
func Context(
@@ -162,8 +162,9 @@ func Context(
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll)
newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
+ ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll)
response := ContextRespsonse{
- Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
+ Event: &ev,
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index 6bc495d8..8f84a134 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -18,15 +18,18 @@ import (
"net/http"
"github.com/gorilla/mux"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
)
// Setup configures the given mux with sync-server listeners
@@ -40,6 +43,7 @@ func Setup(
rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI,
lazyLoadCache caching.LazyLoadCache,
+ fts *fulltext.Search,
) {
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()
@@ -95,4 +99,24 @@ func Setup(
)
}),
).Methods(http.MethodGet, http.MethodOptions)
+
+ v3mux.Handle("/search",
+ httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ if !cfg.Fulltext.Enabled {
+ return util.JSONResponse{
+ Code: http.StatusNotImplemented,
+ JSON: jsonerror.Unknown("Search has been disabled by the server administrator."),
+ }
+ }
+ var nextBatch *string
+ if err := req.ParseForm(); err != nil {
+ return jsonerror.InternalServerError()
+ }
+ if req.Form.Has("next_batch") {
+ nb := req.FormValue("next_batch")
+ nextBatch = &nb
+ }
+ return Search(req, device, syncDB, fts, nextBatch)
+ }),
+ ).Methods(http.MethodPost, http.MethodOptions)
}
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
new file mode 100644
index 00000000..341efeb1
--- /dev/null
+++ b/syncapi/routing/search.go
@@ -0,0 +1,344 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "context"
+ "net/http"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/blevesearch/bleve/v2/search"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+ "github.com/tidwall/gjson"
+
+ "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/internal/fulltext"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/userapi/api"
+)
+
+// nolint:gocyclo
+func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from *string) util.JSONResponse {
+ start := time.Now()
+ var (
+ searchReq SearchRequest
+ err error
+ ctx = req.Context()
+ )
+ resErr := httputil.UnmarshalJSONRequest(req, &searchReq)
+ if resErr != nil {
+ logrus.Error("failed to unmarshal search request")
+ return *resErr
+ }
+
+ nextBatch := 0
+ if from != nil && *from != "" {
+ nextBatch, err = strconv.Atoi(*from)
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ }
+
+ if searchReq.SearchCategories.RoomEvents.Filter.Limit == 0 {
+ searchReq.SearchCategories.RoomEvents.Filter.Limit = 5
+ }
+
+ // only search rooms the user is actually joined to
+ joinedRooms, err := syncDB.RoomIDsWithMembership(ctx, device.UserID, "join")
+ if err != nil {
+ return jsonerror.InternalServerError()
+ }
+ if len(joinedRooms) == 0 {
+ return util.JSONResponse{
+ Code: http.StatusNotFound,
+ JSON: jsonerror.NotFound("User not joined to any rooms."),
+ }
+ }
+ joinedRoomsMap := make(map[string]struct{}, len(joinedRooms))
+ for _, roomID := range joinedRooms {
+ joinedRoomsMap[roomID] = struct{}{}
+ }
+ rooms := []string{}
+ if searchReq.SearchCategories.RoomEvents.Filter.Rooms != nil {
+ for _, roomID := range *searchReq.SearchCategories.RoomEvents.Filter.Rooms {
+ if _, ok := joinedRoomsMap[roomID]; ok {
+ rooms = append(rooms, roomID)
+ }
+ }
+ } else {
+ rooms = joinedRooms
+ }
+
+ if len(rooms) == 0 {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Unknown("User not allowed to search in this room(s)."),
+ }
+ }
+
+ orderByTime := searchReq.SearchCategories.RoomEvents.OrderBy == "recent"
+
+ result, err := fts.Search(
+ searchReq.SearchCategories.RoomEvents.SearchTerm,
+ rooms,
+ searchReq.SearchCategories.RoomEvents.Keys,
+ searchReq.SearchCategories.RoomEvents.Filter.Limit,
+ nextBatch,
+ orderByTime,
+ )
+ if err != nil {
+ logrus.WithError(err).Error("failed to search fulltext")
+ return jsonerror.InternalServerError()
+ }
+ logrus.Debugf("Search took %s", result.Took)
+
+ // From was specified but empty, return no results, only the count
+ if from != nil && *from == "" {
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: SearchResponse{
+ SearchCategories: SearchCategories{
+ RoomEvents: RoomEvents{
+ Count: int(result.Total),
+ NextBatch: nil,
+ },
+ },
+ },
+ }
+ }
+
+ results := []Result{}
+
+ wantEvents := make([]string, 0, len(result.Hits))
+ eventScore := make(map[string]*search.DocumentMatch)
+
+ for _, hit := range result.Hits {
+ wantEvents = append(wantEvents, hit.ID)
+ eventScore[hit.ID] = hit
+ }
+
+ // Filter on m.room.message, as otherwise we also get events like m.reaction
+ // which "breaks" displaying results in Element Web.
+ types := []string{"m.room.message"}
+ roomFilter := &gomatrixserverlib.RoomEventFilter{
+ Rooms: &rooms,
+ Types: &types,
+ }
+
+ evs, err := syncDB.Events(ctx, wantEvents)
+ if err != nil {
+ logrus.WithError(err).Error("failed to get events from database")
+ return jsonerror.InternalServerError()
+ }
+
+ groups := make(map[string]RoomResult)
+ knownUsersProfiles := make(map[string]ProfileInfo)
+
+ // Sort the events by depth, as the returned values aren't ordered
+ if orderByTime {
+ sort.Slice(evs, func(i, j int) bool {
+ return evs[i].Depth() > evs[j].Depth()
+ })
+ }
+
+ stateForRooms := make(map[string][]gomatrixserverlib.ClientEvent)
+ for _, event := range evs {
+ eventsBefore, eventsAfter, err := contextEvents(ctx, syncDB, event, roomFilter, searchReq)
+ if err != nil {
+ logrus.WithError(err).Error("failed to get context events")
+ return jsonerror.InternalServerError()
+ }
+ startToken, endToken, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
+ if err != nil {
+ logrus.WithError(err).Error("failed to get start/end")
+ return jsonerror.InternalServerError()
+ }
+
+ profileInfos := make(map[string]ProfileInfo)
+ for _, ev := range append(eventsBefore, eventsAfter...) {
+ profile, ok := knownUsersProfiles[event.Sender()]
+ if !ok {
+ stateEvent, err := syncDB.GetStateEvent(ctx, ev.RoomID(), gomatrixserverlib.MRoomMember, ev.Sender())
+ if err != nil {
+ logrus.WithError(err).WithField("user_id", event.Sender()).Warn("failed to query userprofile")
+ continue
+ }
+ if stateEvent == nil {
+ continue
+ }
+ profile = ProfileInfo{
+ AvatarURL: gjson.GetBytes(stateEvent.Content(), "avatar_url").Str,
+ DisplayName: gjson.GetBytes(stateEvent.Content(), "displayname").Str,
+ }
+ knownUsersProfiles[event.Sender()] = profile
+ }
+ profileInfos[ev.Sender()] = profile
+ }
+
+ results = append(results, Result{
+ Context: SearchContextResponse{
+ Start: startToken.String(),
+ End: endToken.String(),
+ EventsAfter: gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatSync),
+ EventsBefore: gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatSync),
+ ProfileInfo: profileInfos,
+ },
+ Rank: eventScore[event.EventID()].Score,
+ Result: gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll),
+ })
+ roomGroup := groups[event.RoomID()]
+ roomGroup.Results = append(roomGroup.Results, event.EventID())
+ groups[event.RoomID()] = roomGroup
+ if _, ok := stateForRooms[event.RoomID()]; searchReq.SearchCategories.RoomEvents.IncludeState && !ok {
+ stateFilter := gomatrixserverlib.DefaultStateFilter()
+ state, err := syncDB.CurrentState(ctx, event.RoomID(), &stateFilter, nil)
+ if err != nil {
+ logrus.WithError(err).Error("unable to get current state")
+ return jsonerror.InternalServerError()
+ }
+ stateForRooms[event.RoomID()] = gomatrixserverlib.HeaderedToClientEvents(state, gomatrixserverlib.FormatSync)
+ }
+ }
+
+ var nextBatchResult *string = nil
+ if int(result.Total) > nextBatch+len(results) {
+ nb := strconv.Itoa(len(results) + nextBatch)
+ nextBatchResult = &nb
+ } else if int(result.Total) == nextBatch+len(results) {
+ // Sytest expects a next_batch even if we don't actually have any more results
+ nb := ""
+ nextBatchResult = &nb
+ }
+
+ res := SearchResponse{
+ SearchCategories: SearchCategories{
+ RoomEvents: RoomEvents{
+ Count: int(result.Total),
+ Groups: Groups{RoomID: groups},
+ Results: results,
+ NextBatch: nextBatchResult,
+ Highlights: strings.Split(searchReq.SearchCategories.RoomEvents.SearchTerm, " "),
+ State: stateForRooms,
+ },
+ },
+ }
+
+ logrus.Debugf("Full search request took %v", time.Since(start))
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: res,
+ }
+}
+
+// contextEvents returns the events around a given eventID
+func contextEvents(
+ ctx context.Context,
+ syncDB storage.Database,
+ event *gomatrixserverlib.HeaderedEvent,
+ roomFilter *gomatrixserverlib.RoomEventFilter,
+ searchReq SearchRequest,
+) ([]*gomatrixserverlib.HeaderedEvent, []*gomatrixserverlib.HeaderedEvent, error) {
+ id, _, err := syncDB.SelectContextEvent(ctx, event.RoomID(), event.EventID())
+ if err != nil {
+ logrus.WithError(err).Error("failed to query context event")
+ return nil, nil, err
+ }
+ roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.BeforeLimit
+ eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, event.RoomID(), roomFilter)
+ if err != nil {
+ logrus.WithError(err).Error("failed to query before context event")
+ return nil, nil, err
+ }
+ roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.AfterLimit
+ _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, event.RoomID(), roomFilter)
+ if err != nil {
+ logrus.WithError(err).Error("failed to query after context event")
+ return nil, nil, err
+ }
+ return eventsBefore, eventsAfter, err
+}
+
+type SearchRequest struct {
+ SearchCategories struct {
+ RoomEvents struct {
+ EventContext struct {
+ AfterLimit int `json:"after_limit,omitempty"`
+ BeforeLimit int `json:"before_limit,omitempty"`
+ IncludeProfile bool `json:"include_profile,omitempty"`
+ } `json:"event_context"`
+ Filter gomatrixserverlib.StateFilter `json:"filter"`
+ Groupings struct {
+ GroupBy []struct {
+ Key string `json:"key"`
+ } `json:"group_by"`
+ } `json:"groupings"`
+ IncludeState bool `json:"include_state"`
+ Keys []string `json:"keys"`
+ OrderBy string `json:"order_by"`
+ SearchTerm string `json:"search_term"`
+ } `json:"room_events"`
+ } `json:"search_categories"`
+}
+
+type SearchResponse struct {
+ SearchCategories SearchCategories `json:"search_categories"`
+}
+type RoomResult struct {
+ NextBatch *string `json:"next_batch,omitempty"`
+ Order int `json:"order"`
+ Results []string `json:"results"`
+}
+
+type Groups struct {
+ RoomID map[string]RoomResult `json:"room_id"`
+}
+
+type Result struct {
+ Context SearchContextResponse `json:"context"`
+ Rank float64 `json:"rank"`
+ Result gomatrixserverlib.ClientEvent `json:"result"`
+}
+
+type SearchContextResponse struct {
+ End string `json:"end"`
+ EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after"`
+ EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before"`
+ Start string `json:"start"`
+ ProfileInfo map[string]ProfileInfo `json:"profile_info"`
+}
+
+type ProfileInfo struct {
+ AvatarURL string `json:"avatar_url"`
+ DisplayName string `json:"display_name"`
+}
+
+type RoomEvents struct {
+ Count int `json:"count"`
+ Groups Groups `json:"groups"`
+ Highlights []string `json:"highlights"`
+ NextBatch *string `json:"next_batch,omitempty"`
+ Results []Result `json:"results"`
+ State map[string][]gomatrixserverlib.ClientEvent `json:"state,omitempty"`
+}
+type SearchCategories struct {
+ RoomEvents RoomEvents `json:"room_events"`
+}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index ad3be420..dd03365e 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -161,6 +161,7 @@ type Database interface {
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
// string as the membership.
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
+ ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
}
type Presence interface {
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 041f9906..20a9ea42 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -166,6 +166,8 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
+
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@@ -180,6 +182,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ selectSearchStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@@ -215,6 +218,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ {&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db)
}
@@ -632,3 +636,27 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
return result, rows.Err()
}
+
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
+
+ var eventID string
+ var id int64
+ result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ for rows.Next() {
+ var ev gomatrixserverlib.HeaderedEvent
+ var eventBytes []byte
+ if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
+ return nil, err
+ }
+ if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+ result[id] = ev
+ }
+ return result, rows.Err()
+}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 215bad3a..47e3a991 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -1093,3 +1093,11 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
+
+func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
+ gomatrixserverlib.MRoomName,
+ gomatrixserverlib.MRoomTopic,
+ "m.room.message",
+ })
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 1626e32e..6269f4fd 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -115,6 +115,8 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
+
type outputRoomEventsStatements struct {
db *sql.DB
streamIDStatements *StreamIDStatements
@@ -125,6 +127,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ //selectSearchStmt *sql.Stmt - prepared at runtime
}
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
@@ -157,6 +160,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ //{&s.selectSearchStmt, selectSearchSQL}, - prepared at runtime
}.Prepare(db)
}
@@ -628,3 +632,40 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
}
return
}
+
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ params := make([]interface{}, len(types))
+ for i := range types {
+ params[i] = types[i]
+ }
+ params = append(params, afterID)
+ params = append(params, limit)
+ selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
+
+ stmt, err := s.db.Prepare(selectSQL)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
+
+ var eventID string
+ var id int64
+ result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ for rows.Next() {
+ var ev gomatrixserverlib.HeaderedEvent
+ var eventBytes []byte
+ if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
+ return nil, err
+ }
+ if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+ result[id] = ev
+ }
+ return result, rows.Err()
+}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 9a873c2e..2a6d6fa8 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -75,6 +75,7 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
}
// Topology keeps track of the depths and stream positions for all events.
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index f5d00f36..be19310f 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -88,14 +88,15 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI,
+ streams.InviteStreamProvider, rsAPI, base.Fulltext,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
clientConsumer := consumers.NewOutputClientDataConsumer(
- base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider,
+ base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
+ streams.AccountDataStreamProvider, base.Fulltext,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
@@ -131,6 +132,6 @@ func AddPublicRoutes(
routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
- rsAPI, cfg, base.Caches,
+ rsAPI, cfg, base.Caches, base.Fulltext,
)
}