aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-27 18:06:49 +0200
committerGitHub <noreply@github.com>2022-09-27 18:06:49 +0200
commit87be32ca2671173a4287a938932e543410a32c3a (patch)
tree92d74544fccf5f51873db4f55c04045562990b35 /syncapi/storage
parent6c67552bf9eee18f656d731adf646aa09c5d7c92 (diff)
Fulltext implementation using Bleve (#2675)
Based on #2480 This actually indexes events based on their event type. They are removed from the index if we receive a `m.room.redaction` event on the `OutputRoomEvent` stream. An admin endpoint is added to reindex all existing events. Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go1
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go28
-rw-r--r--syncapi/storage/shared/syncserver.go8
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go41
-rw-r--r--syncapi/storage/tables/interface.go1
5 files changed, 79 insertions, 0 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index ad3be420..dd03365e 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -161,6 +161,7 @@ type Database interface {
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
// string as the membership.
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
+ ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
}
type Presence interface {
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 041f9906..20a9ea42 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -166,6 +166,8 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
+
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@@ -180,6 +182,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ selectSearchStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@@ -215,6 +218,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ {&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db)
}
@@ -632,3 +636,27 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
return result, rows.Err()
}
+
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
+
+ var eventID string
+ var id int64
+ result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ for rows.Next() {
+ var ev gomatrixserverlib.HeaderedEvent
+ var eventBytes []byte
+ if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
+ return nil, err
+ }
+ if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+ result[id] = ev
+ }
+ return result, rows.Err()
+}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 215bad3a..47e3a991 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -1093,3 +1093,11 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
+
+func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
+ gomatrixserverlib.MRoomName,
+ gomatrixserverlib.MRoomTopic,
+ "m.room.message",
+ })
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 1626e32e..6269f4fd 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -115,6 +115,8 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
+
type outputRoomEventsStatements struct {
db *sql.DB
streamIDStatements *StreamIDStatements
@@ -125,6 +127,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ //selectSearchStmt *sql.Stmt - prepared at runtime
}
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
@@ -157,6 +160,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ //{&s.selectSearchStmt, selectSearchSQL}, - prepared at runtime
}.Prepare(db)
}
@@ -628,3 +632,40 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
}
return
}
+
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ params := make([]interface{}, len(types))
+ for i := range types {
+ params[i] = types[i]
+ }
+ params = append(params, afterID)
+ params = append(params, limit)
+ selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
+
+ stmt, err := s.db.Prepare(selectSQL)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
+
+ var eventID string
+ var id int64
+ result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ for rows.Next() {
+ var ev gomatrixserverlib.HeaderedEvent
+ var eventBytes []byte
+ if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
+ return nil, err
+ }
+ if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+ result[id] = ev
+ }
+ return result, rows.Err()
+}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 9a873c2e..2a6d6fa8 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -75,6 +75,7 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
}
// Topology keeps track of the depths and stream positions for all events.