aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorAlex <d.lexand@gmail.com>2024-07-27 22:30:17 +0200
committerGitHub <noreply@github.com>2024-07-27 22:30:17 +0200
commit989795973103c463a33f053663c6a8616177186c (patch)
tree1f37145cbbd48fb75d3e8bfa4785ab6fa8e2832a /syncapi
parentaffb6977e43ad5051761d0de650370f421f751b5 (diff)
Fix: Edited messages appear twice in fulltext search (#3363)
As stated in https://github.com/matrix-org/dendrite/issues/3358 the search response contains both original and edited message. This PR fixes it by removing of the original message from the fulltext index after indexing the edit message event. I also made some cosmetic changes/fixes i found in the code Signed-off-by: `Alexander Dubovikov <d.lexand@gmail.com>`
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go18
-rw-r--r--syncapi/syncapi_test.go92
2 files changed, 110 insertions, 0 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 81c532f1..abf88882 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -601,9 +601,11 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
}
e.SetContentType(ev.Type())
+ var relatesTo gjson.Result
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
+ relatesTo = gjson.GetBytes(ev.Content(), "m\\.relates_to")
case spec.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case spec.MRoomTopic:
@@ -622,6 +624,22 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
if err := s.fts.Index(e); err != nil {
return err
}
+ // If the event is an edited message we remove the original event from the index
+ // to avoid duplicates in the search results.
+ if relatesTo.Exists() {
+ relatedData := relatesTo.Map()
+ if _, ok := relatedData["rel_type"]; ok && relatedData["rel_type"].Str == "m.replace" {
+ // We remove the original event from the index
+ if srcEventID, ok := relatedData["event_id"]; ok {
+ if err := s.fts.Delete(srcEventID.Str); err != nil {
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "src_id": srcEventID.Str,
+ }).WithError(err).Error("Failed to delete edited message from the fulltext index")
+ }
+ }
+ }
+ }
}
return nil
}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 0392f209..d360e10d 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
+ "io"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
+ "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
@@ -17,6 +19,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
+ "github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
rstypes "github.com/matrix-org/dendrite/roomserver/types"
@@ -1324,6 +1327,95 @@ func TestUpdateRelations(t *testing.T) {
})
}
+func TestRemoveEditedEventFromSearchIndex(t *testing.T) {
+ user := test.NewUser(t)
+ alice := userapi.Device{
+ ID: "ALICEID",
+ UserID: user.ID,
+ AccessToken: "ALICE_BEARER_TOKEN",
+ DisplayName: "Alice",
+ AccountType: userapi.AccountTypeUser,
+ }
+
+ routers := httputil.NewRouters()
+
+ cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ defer close()
+
+ // Use an actual roomserver for this
+ natsInstance := jetstream.NATSInstance{}
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ room := test.NewRoom(t, user)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
+
+ if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ ev1 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "first"})
+ ev2 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{
+ "body": " * first",
+ "m.new_content": map[string]interface{}{
+ "body": "first",
+ "msgtype": "m.text",
+ },
+ "m.relates_to": map[string]interface{}{
+ "event_id": ev1.EventID(),
+ "rel_type": "m.replace",
+ },
+ })
+ events := []*rstypes.HeaderedEvent{ev1, ev2}
+
+ for _, e := range events {
+ roomEvents := append([]*rstypes.HeaderedEvent{}, e)
+ if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, roomEvents, "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
+ // wait for the last sent eventID to come down sync
+ path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, e.EventID())
+
+ return gjson.Get(syncBody, path).Exists()
+ })
+
+ // We search that event is the only one nad is the exact event we sent
+ searchResult := searchRequest(t, routers.Client, alice.AccessToken, "first", []string{room.ID})
+ results := gjson.GetBytes(searchResult, fmt.Sprintf(`search_categories.room_events.groups.room_id.%s.results`, room.ID))
+ assert.True(t, results.Exists(), "Should be a search response")
+ assert.Equal(t, 1, len(results.Array()), "Should be exactly one result")
+ assert.Equal(t, e.EventID(), results.Array()[0].String(), "Should be only found exact event")
+ }
+}
+
+func searchRequest(t *testing.T, router *mux.Router, accessToken, searchTerm string, roomList []string) []byte {
+ t.Helper()
+ w := httptest.NewRecorder()
+ rq := test.NewRequest(t, "POST", "/_matrix/client/v3/search", test.WithQueryParams(map[string]string{
+ "access_token": accessToken,
+ }), test.WithJSONBody(t, map[string]interface{}{
+ "search_categories": map[string]interface{}{
+ "room_events": map[string]interface{}{
+ "filters": roomList,
+ "search_term": searchTerm,
+ },
+ },
+ }))
+
+ router.ServeHTTP(w, rq)
+ assert.Equal(t, 200, w.Code)
+ defer w.Result().Body.Close()
+ body, err := io.ReadAll(w.Result().Body)
+ assert.NoError(t, err)
+ return body
+}
func syncUntil(t *testing.T,
routers httputil.Routers, accessToken string,
skip bool,