aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-03-17 12:09:45 +0100
committerGitHub <noreply@github.com>2023-03-17 11:09:45 +0000
commit5579121c6f27105342a2aea05cf9a3119d73cecb (patch)
tree1d8b7bec90079b6f693585d306c19019ea426870 /internal
parentd88f71ab71a60348518f7fa6735ac9f0bfb472c3 (diff)
Preparations for removing `BaseDendrite` (#3016)
Preparations to actually remove/replace `BaseDendrite`. Quite a few changes: - SyncAPI accepts an `fulltext.Indexer` interface (fulltext is removed from `BaseDendrite`) - Caches are removed from `BaseDendrite` - Introduces a `Router` struct (likely to change) - also fixes #2903 - Introduces a `sqlutil.ConnectionManager`, which should remove `base.DatabaseConnection` later on - probably more
Diffstat (limited to 'internal')
-rw-r--r--internal/caching/impl_ristretto.go5
-rw-r--r--internal/fulltext/bleve.go17
-rw-r--r--internal/fulltext/bleve_test.go29
-rw-r--r--internal/fulltext/bleve_wasm.go12
-rw-r--r--internal/httputil/routing.go52
-rw-r--r--internal/httputil/routing_test.go38
-rw-r--r--internal/sqlutil/connection_manager.go54
-rw-r--r--internal/sqlutil/connection_manager_test.go56
8 files changed, 245 insertions, 18 deletions
diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go
index 106b9c99..7663ddcb 100644
--- a/internal/caching/impl_ristretto.go
+++ b/internal/caching/impl_ristretto.go
@@ -46,6 +46,11 @@ const (
eventStateKeyNIDCache
)
+const (
+ DisableMetrics = false
+ EnableMetrics = true
+)
+
func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size
diff --git a/internal/fulltext/bleve.go b/internal/fulltext/bleve.go
index 7187861d..08cef8a9 100644
--- a/internal/fulltext/bleve.go
+++ b/internal/fulltext/bleve.go
@@ -18,10 +18,10 @@
package fulltext
import (
+ "context"
"strings"
"github.com/blevesearch/bleve/v2"
-
// side effect imports to allow all possible languages
_ "github.com/blevesearch/bleve/v2/analysis/lang/ar"
_ "github.com/blevesearch/bleve/v2/analysis/lang/cjk"
@@ -55,6 +55,13 @@ type Search struct {
FulltextIndex bleve.Index
}
+type Indexer interface {
+ Index(elements ...IndexElement) error
+ Delete(eventID string) error
+ Search(term string, roomIDs, keys []string, limit, from int, orderByStreamPos bool) (*bleve.SearchResult, error)
+ Close() error
+}
+
// IndexElement describes the layout of an element to index
type IndexElement struct {
EventID string
@@ -77,12 +84,18 @@ func (i *IndexElement) SetContentType(v string) {
}
// New opens a new/existing fulltext index
-func New(cfg config.Fulltext) (fts *Search, err error) {
+func New(ctx context.Context, cfg config.Fulltext) (fts *Search, err error) {
fts = &Search{}
fts.FulltextIndex, err = openIndex(cfg)
if err != nil {
return nil, err
}
+ go func() {
+ // Wait for the context (should be from process.ProcessContext) to be
+ // done, indicating that Dendrite is shutting down.
+ <-ctx.Done()
+ _ = fts.Close()
+ }()
return fts, nil
}
diff --git a/internal/fulltext/bleve_test.go b/internal/fulltext/bleve_test.go
index d16397a4..6ce69145 100644
--- a/internal/fulltext/bleve_test.go
+++ b/internal/fulltext/bleve_test.go
@@ -18,6 +18,7 @@ import (
"reflect"
"testing"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -25,7 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/config"
)
-func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search {
+func mustOpenIndex(t *testing.T, tempDir string) (*fulltext.Search, *process.ProcessContext) {
t.Helper()
cfg := config.Fulltext{
Enabled: true,
@@ -36,11 +37,12 @@ func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search {
cfg.IndexPath = config.Path(tempDir)
cfg.InMemory = false
}
- fts, err := fulltext.New(cfg)
+ ctx := process.NewProcessContext()
+ fts, err := fulltext.New(ctx.Context(), cfg)
if err != nil {
t.Fatal("failed to open fulltext index:", err)
}
- return fts
+ return fts, ctx
}
func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (eventIDs, roomIDs []string) {
@@ -93,19 +95,17 @@ func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (
func TestOpen(t *testing.T) {
dataDir := t.TempDir()
- fts := mustOpenIndex(t, dataDir)
- if err := fts.Close(); err != nil {
- t.Fatal("unable to close fulltext index", err)
- }
+ _, ctx := mustOpenIndex(t, dataDir)
+ ctx.ShutdownDendrite()
// open existing index
- fts = mustOpenIndex(t, dataDir)
- defer fts.Close()
+ _, ctx = mustOpenIndex(t, dataDir)
+ ctx.ShutdownDendrite()
}
func TestIndex(t *testing.T) {
- fts := mustOpenIndex(t, "")
- defer fts.Close()
+ fts, ctx := mustOpenIndex(t, "")
+ defer ctx.ShutdownDendrite()
// add some data
var streamPos int64 = 1
@@ -128,8 +128,8 @@ func TestIndex(t *testing.T) {
}
func TestDelete(t *testing.T) {
- fts := mustOpenIndex(t, "")
- defer fts.Close()
+ fts, ctx := mustOpenIndex(t, "")
+ defer ctx.ShutdownDendrite()
eventIDs, roomIDs := mustAddTestData(t, fts, 0)
res1, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false)
if err != nil {
@@ -224,7 +224,8 @@ func TestSearch(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- f := mustOpenIndex(t, "")
+ f, ctx := mustOpenIndex(t, "")
+ defer ctx.ShutdownDendrite()
eventIDs, roomIDs := mustAddTestData(t, f, 0)
var searchRooms []string
for _, x := range tt.args.roomIndex {
diff --git a/internal/fulltext/bleve_wasm.go b/internal/fulltext/bleve_wasm.go
index a69a8926..0053ed8c 100644
--- a/internal/fulltext/bleve_wasm.go
+++ b/internal/fulltext/bleve_wasm.go
@@ -15,8 +15,9 @@
package fulltext
import (
- "github.com/matrix-org/dendrite/setup/config"
"time"
+
+ "github.com/matrix-org/dendrite/setup/config"
)
type Search struct{}
@@ -28,6 +29,13 @@ type IndexElement struct {
StreamPosition int64
}
+type Indexer interface {
+ Index(elements ...IndexElement) error
+ Delete(eventID string) error
+ Search(term string, roomIDs, keys []string, limit, from int, orderByStreamPos bool) (SearchResult, error)
+ Close() error
+}
+
type SearchResult struct {
Status interface{} `json:"status"`
Request *interface{} `json:"request"`
@@ -48,7 +56,7 @@ func (f *Search) Close() error {
return nil
}
-func (f *Search) Index(e IndexElement) error {
+func (f *Search) Index(e ...IndexElement) error {
return nil
}
diff --git a/internal/httputil/routing.go b/internal/httputil/routing.go
index 0bd3655e..c733c8ce 100644
--- a/internal/httputil/routing.go
+++ b/internal/httputil/routing.go
@@ -15,7 +15,10 @@
package httputil
import (
+ "net/http"
"net/url"
+
+ "github.com/gorilla/mux"
)
// URLDecodeMapValues is a function that iterates through each of the items in a
@@ -33,3 +36,52 @@ func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) {
return decoded, nil
}
+
+type Routers struct {
+ Client *mux.Router
+ Federation *mux.Router
+ Keys *mux.Router
+ Media *mux.Router
+ WellKnown *mux.Router
+ Static *mux.Router
+ DendriteAdmin *mux.Router
+ SynapseAdmin *mux.Router
+}
+
+func NewRouters() Routers {
+ r := Routers{
+ Client: mux.NewRouter().SkipClean(true).PathPrefix(PublicClientPathPrefix).Subrouter().UseEncodedPath(),
+ Federation: mux.NewRouter().SkipClean(true).PathPrefix(PublicFederationPathPrefix).Subrouter().UseEncodedPath(),
+ Keys: mux.NewRouter().SkipClean(true).PathPrefix(PublicKeyPathPrefix).Subrouter().UseEncodedPath(),
+ Media: mux.NewRouter().SkipClean(true).PathPrefix(PublicMediaPathPrefix).Subrouter().UseEncodedPath(),
+ WellKnown: mux.NewRouter().SkipClean(true).PathPrefix(PublicWellKnownPrefix).Subrouter().UseEncodedPath(),
+ Static: mux.NewRouter().SkipClean(true).PathPrefix(PublicStaticPath).Subrouter().UseEncodedPath(),
+ DendriteAdmin: mux.NewRouter().SkipClean(true).PathPrefix(DendriteAdminPathPrefix).Subrouter().UseEncodedPath(),
+ SynapseAdmin: mux.NewRouter().SkipClean(true).PathPrefix(SynapseAdminPathPrefix).Subrouter().UseEncodedPath(),
+ }
+ r.configureHTTPErrors()
+ return r
+}
+
+var NotAllowedHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ w.Header().Set("Content-Type", "application/json")
+ _, _ = w.Write([]byte(`{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}`)) // nolint:misspell
+}))
+
+var NotFoundCORSHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusNotFound)
+ w.Header().Set("Content-Type", "application/json")
+ _, _ = w.Write([]byte(`{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}`)) // nolint:misspell
+}))
+
+func (r *Routers) configureHTTPErrors() {
+ for _, router := range []*mux.Router{
+ r.Client, r.Federation, r.Keys,
+ r.Media, r.WellKnown, r.Static,
+ r.DendriteAdmin, r.SynapseAdmin,
+ } {
+ router.NotFoundHandler = NotFoundCORSHandler
+ router.MethodNotAllowedHandler = NotAllowedHandler
+ }
+}
diff --git a/internal/httputil/routing_test.go b/internal/httputil/routing_test.go
new file mode 100644
index 00000000..21e2bf48
--- /dev/null
+++ b/internal/httputil/routing_test.go
@@ -0,0 +1,38 @@
+package httputil
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "path/filepath"
+ "testing"
+)
+
+func TestRoutersError(t *testing.T) {
+ r := NewRouters()
+
+ // not found test
+ rec := httptest.NewRecorder()
+ req := httptest.NewRequest(http.MethodGet, filepath.Join(PublicFederationPathPrefix, "doesnotexist"), nil)
+ r.Federation.ServeHTTP(rec, req)
+ if rec.Code != http.StatusNotFound {
+ t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
+ }
+ if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
+ t.Fatalf("unexpected content-type: %s", ct)
+ }
+
+ // not allowed test
+ r.DendriteAdmin.
+ Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {})).
+ Methods(http.MethodPost)
+
+ rec = httptest.NewRecorder()
+ req = httptest.NewRequest(http.MethodGet, filepath.Join(DendriteAdminPathPrefix, "test"), nil)
+ r.DendriteAdmin.ServeHTTP(rec, req)
+ if rec.Code != http.StatusMethodNotAllowed {
+ t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
+ }
+ if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
+ t.Fatalf("unexpected content-type: %s", ct)
+ }
+}
diff --git a/internal/sqlutil/connection_manager.go b/internal/sqlutil/connection_manager.go
new file mode 100644
index 00000000..cefd9f80
--- /dev/null
+++ b/internal/sqlutil/connection_manager.go
@@ -0,0 +1,54 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlutil
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/setup/config"
+)
+
+type Connections struct {
+ db *sql.DB
+ writer Writer
+}
+
+func NewConnectionManager() Connections {
+ return Connections{}
+}
+
+func (c *Connections) Connection(dbProperties *config.DatabaseOptions) (*sql.DB, Writer, error) {
+ writer := NewDummyWriter()
+ if dbProperties.ConnectionString.IsSQLite() {
+ writer = NewExclusiveWriter()
+ }
+ if dbProperties.ConnectionString != "" || c.db == nil {
+ var err error
+ // Open a new database connection using the supplied config.
+ c.db, err = Open(dbProperties, writer)
+ if err != nil {
+ return nil, nil, err
+ }
+ c.writer = writer
+ return c.db, c.writer, nil
+ }
+ if c.db != nil && c.writer != nil {
+ // Ignore the supplied config and return the global pool and
+ // writer.
+ return c.db, c.writer, nil
+ }
+ return nil, nil, fmt.Errorf("no database connections configured")
+}
diff --git a/internal/sqlutil/connection_manager_test.go b/internal/sqlutil/connection_manager_test.go
new file mode 100644
index 00000000..610629d5
--- /dev/null
+++ b/internal/sqlutil/connection_manager_test.go
@@ -0,0 +1,56 @@
+package sqlutil_test
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/test"
+)
+
+func TestConnectionManager(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ conStr, close := test.PrepareDBConnectionString(t, dbType)
+ t.Cleanup(close)
+ cm := sqlutil.NewConnectionManager()
+
+ dbProps := &config.DatabaseOptions{ConnectionString: config.DataSource(string(conStr))}
+ db, writer, err := cm.Connection(dbProps)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ switch dbType {
+ case test.DBTypeSQLite:
+ _, ok := writer.(*sqlutil.ExclusiveWriter)
+ if !ok {
+ t.Fatalf("expected exclusive writer")
+ }
+ case test.DBTypePostgres:
+ _, ok := writer.(*sqlutil.DummyWriter)
+ if !ok {
+ t.Fatalf("expected dummy writer")
+ }
+ }
+
+ // test global db pool
+ dbGlobal, writerGlobal, err := cm.Connection(&config.DatabaseOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(db, dbGlobal) {
+ t.Fatalf("expected database connection to be reused")
+ }
+ if !reflect.DeepEqual(writer, writerGlobal) {
+ t.Fatalf("expected database writer to be reused")
+ }
+
+ // test invalid connection string configured
+ cm = sqlutil.NewConnectionManager()
+ _, _, err = cm.Connection(&config.DatabaseOptions{ConnectionString: "http://"})
+ if err == nil {
+ t.Fatal("expected an error but got none")
+ }
+ })
+}