aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-06-06 20:55:18 +0000
committerGitHub <noreply@github.com>2023-06-06 20:55:18 +0000
commit7a1fd7f512ce06a472a2051ee63eae4a270eb71a (patch)
tree20128b0d3f7c69dd776aa7b2b9bc3194dda7dd75 /syncapi
parent725ff5567d2a3bc9992b065e72ccabefb595ec1c (diff)
PDU Sender split (#3100)
Initial cut of splitting PDU Sender into SenderID & looking up UserID where required.
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go2
-rw-r--r--syncapi/routing/context.go23
-rw-r--r--syncapi/routing/getevent.go7
-rw-r--r--syncapi/routing/memberships.go6
-rw-r--r--syncapi/routing/messages.go12
-rw-r--r--syncapi/routing/relations.go7
-rw-r--r--syncapi/routing/routing.go2
-rw-r--r--syncapi/routing/search.go46
-rw-r--r--syncapi/routing/search_test.go10
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go2
-rw-r--r--syncapi/storage/shared/storage_consumer.go21
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go2
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go2
-rw-r--r--syncapi/streams/stream_invite.go12
-rw-r--r--syncapi/streams/stream_pdu.go38
-rw-r--r--syncapi/streams/streams.go1
-rw-r--r--syncapi/syncapi_test.go4
-rw-r--r--syncapi/synctypes/clientevent.go13
-rw-r--r--syncapi/synctypes/clientevent_test.go17
-rw-r--r--syncapi/types/types.go4
-rw-r--r--syncapi/types/types_test.go12
22 files changed, 186 insertions, 59 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 56285dbf..c0836465 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -523,7 +523,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
prev := types.PrevEventRef{
PrevContent: prevEvent.Content(),
ReplacesState: prevEvent.EventID(),
- PrevSender: prevEvent.Sender(),
+ PrevSender: prevEvent.SenderID(),
}
event.PDU, err = event.SetUnsigned(prev)
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index ac17d39d..27e99a35 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -193,14 +193,20 @@ func Context(
}
}
- eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll)
- eventsAfterClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfterFiltered), synctypes.FormatAll)
+ eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
+ eventsAfterClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfterFiltered), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
newState := state
if filter.LazyLoadMembers {
allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...)
allEvents = append(allEvents, &requestedEvent)
- evs := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(allEvents), synctypes.FormatAll)
+ evs := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(allEvents), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache)
if err != nil {
logrus.WithError(err).Error("unable to load membership events")
@@ -211,12 +217,19 @@ func Context(
}
}
- ev := synctypes.ToClientEvent(&requestedEvent, synctypes.FormatAll)
+ sender := spec.UserID{}
+ userID, err := rsAPI.QueryUserIDForSender(ctx, requestedEvent.RoomID(), requestedEvent.SenderID())
+ if err == nil && userID != nil {
+ sender = *userID
+ }
+ ev := synctypes.ToClientEvent(&requestedEvent, synctypes.FormatAll, sender)
response := ContextRespsonse{
Event: &ev,
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
- State: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(newState), synctypes.FormatAll),
+ State: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(newState), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ }),
}
if len(response.State) > filter.Limit {
diff --git a/syncapi/routing/getevent.go b/syncapi/routing/getevent.go
index 0d3d412f..63df7e83 100644
--- a/syncapi/routing/getevent.go
+++ b/syncapi/routing/getevent.go
@@ -101,8 +101,13 @@ func GetEvent(
}
}
+ sender := spec.UserID{}
+ senderUserID, err := rsAPI.QueryUserIDForSender(req.Context(), roomID, events[0].SenderID())
+ if err == nil && senderUserID != nil {
+ sender = *senderUserID
+ }
return util.JSONResponse{
Code: http.StatusOK,
- JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll),
+ JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, sender),
}
}
diff --git a/syncapi/routing/memberships.go b/syncapi/routing/memberships.go
index 7d2e137d..9c2319dd 100644
--- a/syncapi/routing/memberships.go
+++ b/syncapi/routing/memberships.go
@@ -144,7 +144,7 @@ func GetMemberships(
JSON: spec.InternalServerError{},
}
}
- res.Joined[ev.Sender()] = joinedMember(content)
+ res.Joined[ev.SenderID()] = joinedMember(content)
}
return util.JSONResponse{
Code: http.StatusOK,
@@ -153,6 +153,8 @@ func GetMemberships(
}
return util.JSONResponse{
Code: http.StatusOK,
- JSON: getMembershipResponse{synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(result), synctypes.FormatAll)},
+ JSON: getMembershipResponse{synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(result), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
+ })},
}
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index aeaec699..879739d0 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -241,7 +241,7 @@ func OnIncomingMessagesRequest(
device: device,
}
- clientEvents, start, end, err := mReq.retrieveEvents()
+ clientEvents, start, end, err := mReq.retrieveEvents(req.Context(), rsAPI)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
return util.JSONResponse{
@@ -273,7 +273,9 @@ func OnIncomingMessagesRequest(
JSON: spec.InternalServerError{},
}
}
- res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll)...)
+ res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
+ })...)
}
// If we didn't return any events, set the end to an empty string, so it will be omitted
@@ -310,7 +312,7 @@ func getMembershipForUser(ctx context.Context, roomID, userID string, rsAPI api.
// homeserver in the room for older events.
// Returns an error if there was an issue talking to the database or with the
// remote homeserver.
-func (r *messagesReq) retrieveEvents() (
+func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserverAPI) (
clientEvents []synctypes.ClientEvent, start,
end types.TopologyToken, err error,
) {
@@ -383,7 +385,9 @@ func (r *messagesReq) retrieveEvents() (
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)")
- return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll), start, end, err
+ return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ }), start, end, err
}
func (r *messagesReq) getStartEnd(events []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go
index 8374bf5b..f21c684c 100644
--- a/syncapi/routing/relations.go
+++ b/syncapi/routing/relations.go
@@ -114,9 +114,14 @@ func Relations(
// type if it was specified.
res.Chunk = make([]synctypes.ClientEvent, 0, len(filteredEvents))
for _, event := range filteredEvents {
+ sender := spec.UserID{}
+ userID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), event.SenderID())
+ if err == nil && userID != nil {
+ sender = *userID
+ }
res.Chunk = append(
res.Chunk,
- synctypes.ToClientEvent(event.PDU, synctypes.FormatAll),
+ synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender),
)
}
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index 9ad0c047..8542c0b7 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -171,7 +171,7 @@ func Setup(
nb := req.FormValue("next_batch")
nextBatch = &nb
}
- return Search(req, device, syncDB, fts, nextBatch)
+ return Search(req, device, syncDB, fts, nextBatch, rsAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
index b7191873..9cf3eabe 100644
--- a/syncapi/routing/search.go
+++ b/syncapi/routing/search.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -38,7 +39,7 @@ import (
)
// nolint:gocyclo
-func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts fulltext.Indexer, from *string) util.JSONResponse {
+func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts fulltext.Indexer, from *string, rsAPI roomserverAPI.SyncRoomserverAPI) util.JSONResponse {
start := time.Now()
var (
searchReq SearchRequest
@@ -204,11 +205,17 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
profileInfos := make(map[string]ProfileInfoResponse)
for _, ev := range append(eventsBefore, eventsAfter...) {
- profile, ok := knownUsersProfiles[event.Sender()]
+ userID, queryErr := rsAPI.QueryUserIDForSender(req.Context(), ev.RoomID(), ev.SenderID())
+ if queryErr != nil {
+ logrus.WithError(queryErr).WithField("sender_id", event.SenderID()).Warn("failed to query userprofile")
+ continue
+ }
+
+ profile, ok := knownUsersProfiles[userID.String()]
if !ok {
- stateEvent, err := snapshot.GetStateEvent(ctx, ev.RoomID(), spec.MRoomMember, ev.Sender())
- if err != nil {
- logrus.WithError(err).WithField("user_id", event.Sender()).Warn("failed to query userprofile")
+ stateEvent, stateErr := snapshot.GetStateEvent(ctx, ev.RoomID(), spec.MRoomMember, ev.SenderID())
+ if stateErr != nil {
+ logrus.WithError(stateErr).WithField("sender_id", event.SenderID()).Warn("failed to query userprofile")
continue
}
if stateEvent == nil {
@@ -218,21 +225,30 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
AvatarURL: gjson.GetBytes(stateEvent.Content(), "avatar_url").Str,
DisplayName: gjson.GetBytes(stateEvent.Content(), "displayname").Str,
}
- knownUsersProfiles[event.Sender()] = profile
+ knownUsersProfiles[userID.String()] = profile
}
- profileInfos[ev.Sender()] = profile
+ profileInfos[userID.String()] = profile
}
+ sender := spec.UserID{}
+ userID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), event.SenderID())
+ if err == nil && userID != nil {
+ sender = *userID
+ }
results = append(results, Result{
Context: SearchContextResponse{
- Start: startToken.String(),
- End: endToken.String(),
- EventsAfter: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfter), synctypes.FormatSync),
- EventsBefore: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBefore), synctypes.FormatSync),
- ProfileInfo: profileInfos,
+ Start: startToken.String(),
+ End: endToken.String(),
+ EventsAfter: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfter), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
+ }),
+ EventsBefore: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBefore), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
+ }),
+ ProfileInfo: profileInfos,
},
Rank: eventScore[event.EventID()].Score,
- Result: synctypes.ToClientEvent(event, synctypes.FormatAll),
+ Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender),
})
roomGroup := groups[event.RoomID()]
roomGroup.Results = append(roomGroup.Results, event.EventID())
@@ -247,7 +263,9 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
JSON: spec.InternalServerError{},
}
}
- stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync)
+ stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
+ })
}
}
diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go
index 1cc95a87..b36be823 100644
--- a/syncapi/routing/search_test.go
+++ b/syncapi/routing/search_test.go
@@ -2,6 +2,7 @@ package routing
import (
"bytes"
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
@@ -9,6 +10,7 @@ import (
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -21,6 +23,12 @@ import (
"github.com/stretchr/testify/assert"
)
+type FakeSyncRoomserverAPI struct{ rsapi.SyncRoomserverAPI }
+
+func (f *FakeSyncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID string, senderID string) (*spec.UserID, error) {
+ return spec.NewUserID(senderID, true)
+}
+
func TestSearch(t *testing.T) {
alice := test.NewUser(t)
aliceDevice := userapi.Device{UserID: alice.ID}
@@ -247,7 +255,7 @@ func TestSearch(t *testing.T) {
assert.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/", reqBody)
- res := Search(req, tc.device, db, fts, tc.from)
+ res := Search(req, tc.device, db, fts, tc.from, &FakeSyncRoomserverAPI{})
if !tc.wantOK && !res.Is2xx() {
return
}
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index 0cc96373..bfe5e9bd 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -343,7 +343,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
- event.Sender(),
+ event.SenderID(),
containsURL,
*event.StateKey(),
headeredJSON,
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 3aadbccf..e068afab 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
- event.Sender(),
+ event.SenderID(),
containsURL,
pq.StringArray(addState),
pq.StringArray(removeState),
diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go
index ecfd418f..17a6a69c 100644
--- a/syncapi/storage/shared/storage_consumer.go
+++ b/syncapi/storage/shared/storage_consumer.go
@@ -195,7 +195,21 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
- if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
+ userID, err := spec.NewUserID(device.UserID, true)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "event_id": out[i].EventID(),
+ }).WithError(err).Warnf("Failed to add transaction ID to event")
+ continue
+ }
+ deviceSenderID, err := d.getSenderIDForUser(in[i].RoomID(), *userID)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "event_id": out[i].EventID(),
+ }).WithError(err).Warnf("Failed to add transaction ID to event")
+ continue
+ }
+ if deviceSenderID == in[i].SenderID() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
"transaction_id", in[i].TransactionID.TransactionID,
)
@@ -210,6 +224,11 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
return out
}
+func (d *Database) getSenderIDForUser(roomID string, userID spec.UserID) (string, error) { // nolint
+ // TODO: Repalce with actual logic for pseudoIDs
+ return userID.String(), nil
+}
+
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 1b8632eb..e432e483 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -342,7 +342,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
- event.Sender(),
+ event.SenderID(),
containsURL,
*event.StateKey(),
headeredJSON,
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index d63e7606..5a47aec4 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
- event.Sender(),
+ event.SenderID(),
containsURL,
string(addStateJSON),
string(removeStateJSON),
diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go
index becd863a..a8b0a7b6 100644
--- a/syncapi/streams/stream_invite.go
+++ b/syncapi/streams/stream_invite.go
@@ -10,6 +10,7 @@ import (
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -17,6 +18,7 @@ import (
type InviteStreamProvider struct {
DefaultStreamProvider
+ rsAPI api.SyncRoomserverAPI
}
func (p *InviteStreamProvider) Setup(
@@ -62,11 +64,17 @@ func (p *InviteStreamProvider) IncrementalSync(
}
for roomID, inviteEvent := range invites {
+ user := spec.UserID{}
+ sender, err := p.rsAPI.QueryUserIDForSender(ctx, inviteEvent.RoomID(), inviteEvent.SenderID())
+ if err == nil && sender != nil {
+ user = *sender
+ }
+
// skip ignored user events
- if _, ok := req.IgnoredUsers.List[inviteEvent.Sender()]; ok {
+ if _, ok := req.IgnoredUsers.List[user.String()]; ok {
continue
}
- ir := types.NewInviteResponse(inviteEvent)
+ ir := types.NewInviteResponse(inviteEvent, user)
req.Response.Rooms.Invite[roomID] = ir
}
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 0ea48a9d..8f83a089 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -376,20 +376,28 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
req.Response.Rooms.Join[delta.RoomID] = jr
case spec.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync)
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
jr.Timeline.Limited = limited
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
req.Response.Rooms.Peek[delta.RoomID] = jr
case spec.Leave:
@@ -398,11 +406,15 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
- lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
+ lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
- lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
+ lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
req.Response.Rooms.Leave[delta.RoomID] = lr
}
@@ -425,7 +437,7 @@ func applyHistoryVisibilityFilter(
for _, ev := range recentEvents {
if ev.StateKey() != nil {
stateTypes = append(stateTypes, ev.Type())
- senders = append(senders, ev.Sender())
+ senders = append(senders, ev.SenderID())
}
}
@@ -552,11 +564,15 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync)
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID, senderID string) (*spec.UserID, error) {
+ return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
+ })
return jr, nil
}
@@ -577,8 +593,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// Add all users the client doesn't know about yet to a list
for _, event := range timelineEvents {
// Membership is not yet cached, add it to the list
- if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok {
- timelineUsers[event.Sender()] = struct{}{}
+ if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.SenderID()); !ok {
+ timelineUsers[event.SenderID()] = struct{}{}
}
}
// Preallocate with the same amount, even if it will end up with fewer values
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index a35491ac..f25bc978 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -45,6 +45,7 @@ func NewSyncStreamProviders(
},
InviteStreamProvider: &InviteStreamProvider{
DefaultStreamProvider: DefaultStreamProvider{DB: d},
+ rsAPI: rsAPI,
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
DefaultStreamProvider: DefaultStreamProvider{DB: d},
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index bc766e66..78c857ab 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -40,6 +40,10 @@ type syncRoomserverAPI struct {
rooms []*test.Room
}
+func (s *syncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID string, senderID string) (*spec.UserID, error) {
+ return spec.NewUserID(senderID, true)
+}
+
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
var room *test.Room
for _, r := range s.rooms {
diff --git a/syncapi/synctypes/clientevent.go b/syncapi/synctypes/clientevent.go
index c722fe60..66fb1d01 100644
--- a/syncapi/synctypes/clientevent.go
+++ b/syncapi/synctypes/clientevent.go
@@ -44,22 +44,27 @@ type ClientEvent struct {
}
// ToClientEvents converts server events to client events.
-func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat) []ClientEvent {
+func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat, userIDForSender spec.UserIDForSender) []ClientEvent {
evs := make([]ClientEvent, 0, len(serverEvs))
for _, se := range serverEvs {
if se == nil {
continue // TODO: shouldn't happen?
}
- evs = append(evs, ToClientEvent(se, format))
+ sender := spec.UserID{}
+ userID, err := userIDForSender(se.RoomID(), se.SenderID())
+ if err == nil && userID != nil {
+ sender = *userID
+ }
+ evs = append(evs, ToClientEvent(se, format, sender))
}
return evs
}
// ToClientEvent converts a single server event to a client event.
-func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat) ClientEvent {
+func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender spec.UserID) ClientEvent {
ce := ClientEvent{
Content: spec.RawJSON(se.Content()),
- Sender: se.Sender(),
+ Sender: sender.String(),
Type: se.Type(),
StateKey: se.StateKey(),
Unsigned: spec.RawJSON(se.Unsigned()),
diff --git a/syncapi/synctypes/clientevent_test.go b/syncapi/synctypes/clientevent_test.go
index b914e64f..34179508 100644
--- a/syncapi/synctypes/clientevent_test.go
+++ b/syncapi/synctypes/clientevent_test.go
@@ -21,6 +21,7 @@ import (
"testing"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/gomatrixserverlib/spec"
)
func TestToClientEvent(t *testing.T) { // nolint: gocyclo
@@ -43,7 +44,11 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo
if err != nil {
t.Fatalf("failed to create Event: %s", err)
}
- ce := ToClientEvent(ev, FormatAll)
+ userID, err := spec.NewUserID("@test:localhost", true)
+ if err != nil {
+ t.Fatalf("failed to create userID: %s", err)
+ }
+ ce := ToClientEvent(ev, FormatAll, *userID)
if ce.EventID != ev.EventID() {
t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID)
}
@@ -62,8 +67,8 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo
if !bytes.Equal(ce.Unsigned, ev.Unsigned()) {
t.Errorf("ClientEvent.Unsigned: wanted %s, got %s", string(ev.Unsigned()), string(ce.Unsigned))
}
- if ce.Sender != ev.Sender() {
- t.Errorf("ClientEvent.Sender: wanted %s, got %s", ev.Sender(), ce.Sender)
+ if ce.Sender != userID.String() {
+ t.Errorf("ClientEvent.Sender: wanted %s, got %s", userID.String(), ce.Sender)
}
j, err := json.Marshal(ce)
if err != nil {
@@ -98,7 +103,11 @@ func TestToClientFormatSync(t *testing.T) {
if err != nil {
t.Fatalf("failed to create Event: %s", err)
}
- ce := ToClientEvent(ev, FormatSync)
+ userID, err := spec.NewUserID("@test:localhost", true)
+ if err != nil {
+ t.Fatalf("failed to create userID: %s", err)
+ }
+ ce := ToClientEvent(ev, FormatSync, *userID)
if ce.RoomID != "" {
t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 22c27fea..526a120d 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -539,7 +539,7 @@ type InviteResponse struct {
}
// NewInviteResponse creates an empty response with initialised arrays.
-func NewInviteResponse(event *types.HeaderedEvent) *InviteResponse {
+func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID) *InviteResponse {
res := InviteResponse{}
res.InviteState.Events = []json.RawMessage{}
@@ -552,7 +552,7 @@ func NewInviteResponse(event *types.HeaderedEvent) *InviteResponse {
// Then we'll see if we can create a partial of the invite event itself.
// This is needed for clients to work out *who* sent the invite.
- inviteEvent := synctypes.ToClientEvent(event.PDU, synctypes.FormatSync)
+ inviteEvent := synctypes.ToClientEvent(event.PDU, synctypes.FormatSync, userID)
inviteEvent.Unsigned = nil
if ev, err := json.Marshal(inviteEvent); err == nil {
res.InviteState.Events = append(res.InviteState.Events, ev)
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index 8e0448fe..a79ce541 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -8,8 +8,13 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/gomatrixserverlib/spec"
)
+func UserIDForSender(roomID string, senderID string) (*spec.UserID, error) {
+ return spec.NewUserID(senderID, true)
+}
+
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(),
@@ -56,7 +61,12 @@ func TestNewInviteResponse(t *testing.T) {
t.Fatal(err)
}
- res := NewInviteResponse(&types.HeaderedEvent{PDU: ev})
+ sender, err := spec.NewUserID("@neilalexander:matrix.org", true)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res := NewInviteResponse(&types.HeaderedEvent{PDU: ev}, *sender)
j, err := json.Marshal(res)
if err != nil {
t.Fatal(err)